blob: 99f0036527e333e8f88226ec2293d32d6a2cd0a9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.services;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Provider;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.cache.TypeCache;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
* for listening to changes to the repository.
*/
@Singleton
public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class);
private final short maxAuditResults;
private static final String CONFIG_MAX_AUDIT_RESULTS = "atlas.audit.maxResults";
private static final short DEFAULT_MAX_AUDIT_RESULTS = 1000;
private final TypeSystem typeSystem;
private final MetadataRepository repository;
private final ITypeStore typeStore;
private IBootstrapTypesRegistrar typesRegistrar;
private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>();
private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>();
private boolean wasInitialized = false;
@Inject
private EntityAuditRepository auditRepository;
@Inject
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders, TypeCache typeCache)
throws AtlasException {
this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders,
TypeSystem.getInstance(), ApplicationProperties.get(), typeCache);
}
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final IBootstrapTypesRegistrar typesRegistrar,
final Collection<Provider<TypesChangeListener>> typeListenerProviders,
final Collection<Provider<EntityChangeListener>> entityListenerProviders,
final TypeSystem typeSystem,
final Configuration configuration, TypeCache typeCache) throws AtlasException {
this.typeStore = typeStore;
this.typesRegistrar = typesRegistrar;
this.typeSystem = typeSystem;
/**
* Ideally a TypeCache implementation should have been injected in the TypeSystemProvider,
* but a singleton of TypeSystem is constructed privately within the class so that
* clients of TypeSystem would never instantiate a TypeSystem object directly in
* their code. As soon as a client makes a call to TypeSystem.getInstance(), they
* should have the singleton ready for consumption. Manually inject TypeSystem with
* the Guice-instantiated type cache here, before types are restored.
* This allows cache implementations to participate in Guice dependency injection.
*/
this.typeSystem.setTypeCache(typeCache);
this.repository = repository;
for (Provider<TypesChangeListener> provider : typeListenerProviders) {
typeChangeListeners.add(provider.get());
}
for (Provider<EntityChangeListener> provider : entityListenerProviders) {
entityChangeListeners.add(provider.get());
}
if (!HAConfiguration.isHAEnabled(configuration)) {
restoreTypeSystem();
}
maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS);
}
private void restoreTypeSystem() throws AtlasException {
LOG.info("Restoring type system from the store");
TypesDef typesDef = typeStore.restore();
if (!wasInitialized) {
LOG.info("Initializing type system for the first time.");
typeSystem.defineTypes(typesDef);
// restore types before creating super types
createSuperTypes();
typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this);
wasInitialized = true;
} else {
LOG.info("Type system was already initialized, refreshing cache.");
refreshCache(typesDef);
}
LOG.info("Restored type system from the store");
}
private void refreshCache(TypesDef typesDef) throws AtlasException {
TypeSystem.TransientTypeSystem transientTypeSystem
= typeSystem.createTransientTypeSystem(typesDef, true);
Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
LOG.info("Number of types got from transient type system: " + typesAdded.size());
typeSystem.commitTypes(typesAdded);
}
@InterfaceAudience.Private
private void createSuperTypes() throws AtlasException {
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
TypesUtil.createUniqueRequiredAttrDef(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
DataTypes.STRING_TYPE));
createType(referenceableType);
HierarchicalTypeDefinition<ClassType> assetType = TypesUtil
.createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.<String>of(),
TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE),
TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE),
TypesUtil.createOptionalAttrDef(AtlasClient.OWNER, DataTypes.STRING_TYPE));
createType(assetType);
HierarchicalTypeDefinition<ClassType> infraType = TypesUtil
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(infraType);
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(datasetType);
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
createType(processType);
}
private void createType(HierarchicalTypeDefinition<ClassType> type) throws AtlasException {
if (!typeSystem.isRegistered(type.typeName)) {
TypesDef typesDef = TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(type));
createType(TypesSerialization.toJson(typesDef));
}
}
/**
* Creates a new type based on the type system to enable adding
* entities (instances for types).
*
* @param typeDefinition definition as json
* @return a unique id for this type
*/
@Override
public JSONObject createType(String typeDefinition) throws AtlasException {
return createOrUpdateTypes(typeDefinition, false);
}
private JSONObject createOrUpdateTypes(String typeDefinition, boolean isUpdate) throws AtlasException {
ParamChecker.notEmpty(typeDefinition, "type definition");
TypesDef typesDef = validateTypeDefinition(typeDefinition);
try {
final TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, isUpdate);
final Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded();
try {
/* Create indexes first so that if index creation fails then we rollback
the typesystem and also do not persist the graph
*/
if (isUpdate) {
onTypesUpdated(typesAdded);
} else {
onTypesAdded(typesAdded);
}
typeStore.store(transientTypeSystem, ImmutableList.copyOf(typesAdded.keySet()));
typeSystem.commitTypes(typesAdded);
} catch (Throwable t) {
throw new AtlasException("Unable to persist types ", t);
}
return new JSONObject() {{
put(AtlasClient.TYPES, typesAdded.keySet());
}};
} catch (JSONException e) {
LOG.error("Unable to create response for types={}", typeDefinition, e);
throw new AtlasException("Unable to create response ", e);
}
}
@Override
public JSONObject updateType(String typeDefinition) throws AtlasException {
return createOrUpdateTypes(typeDefinition, true);
}
private TypesDef validateTypeDefinition(String typeDefinition) {
try {
TypesDef typesDef = TypesSerialization.fromJson(typeDefinition);
if (typesDef.isEmpty()) {
throw new IllegalArgumentException("Invalid type definition");
}
return typesDef;
} catch (Exception e) {
LOG.error("Unable to deserialize json={}", typeDefinition, e);
throw new IllegalArgumentException("Unable to deserialize json " + typeDefinition, e);
}
}
/**
* Return the definition for the given type.
*
* @param typeName name for this type, must be unique
* @return type definition as JSON
*/
@Override
public String getTypeDefinition(String typeName) throws AtlasException {
final IDataType dataType = typeSystem.getDataType(IDataType.class, typeName);
return TypesSerialization.toJson(typeSystem, dataType.getName());
}
/**
* Return the list of types in the repository.
*
* @return list of type names in the repository
*/
@Override
public List<String> getTypeNamesList() throws AtlasException {
return typeSystem.getTypeNames();
}
/**
* Return the list of trait type names in the type system.
*
* @return list of trait type names in the type system
*/
@Override
public List<String> getTypeNamesByCategory(DataTypes.TypeCategory typeCategory) throws AtlasException {
return typeSystem.getTypeNamesByCategory(typeCategory);
}
/**
* Creates an entity, instance of the type.
*
* @param entityInstanceDefinition json array of entity definitions
* @return guids - list of guids
*/
@Override
public List<String> createEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
return createEntities(typedInstances);
}
public List<String> createEntities(ITypedReferenceableInstance[] typedInstances) throws AtlasException {
final List<String> guids = repository.createEntities(typedInstances);
onEntitiesAdded(guids);
return guids;
}
private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition)
throws AtlasException {
try {
JSONArray referableInstances = new JSONArray(entityInstanceDefinition);
ITypedReferenceableInstance[] instances = new ITypedReferenceableInstance[referableInstances.length()];
for (int index = 0; index < referableInstances.length(); index++) {
Referenceable entityInstance =
InstanceSerialization.fromJsonReferenceable(referableInstances.getString(index), true);
ITypedReferenceableInstance typedInstrance = getTypedReferenceableInstance(entityInstance);
instances[index] = typedInstrance;
}
return instances;
} catch(ValueConversionException | TypeNotFoundException e) {
throw e;
} catch (Exception e) { // exception from deserializer
LOG.error("Unable to deserialize json={}", entityInstanceDefinition, e);
throw new IllegalArgumentException("Unable to deserialize json", e);
}
}
@Override
public ITypedReferenceableInstance getTypedReferenceableInstance(Referenceable entityInstance) throws AtlasException {
final String entityTypeName = entityInstance.getTypeName();
ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null");
ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName);
//Both assigned id and values are required for full update
//classtype.convert() will remove values if id is assigned. So, set temp id, convert and
// then replace with original id
Id origId = entityInstance.getId();
entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName()));
ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED);
((ReferenceableInstance)typedInstrance).replaceWithNewId(origId);
return typedInstrance;
}
/**
* Return the definition for the given guid.
*
* @param guid guid
* @return entity definition as JSON
*/
@Override
public String getEntityDefinition(String guid) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid);
return InstanceSerialization.toJson(instance, true);
}
private ITypedReferenceableInstance getEntityDefinitionReference(String entityType, String attribute, String value)
throws AtlasException {
validateTypeExists(entityType);
validateUniqueAttribute(entityType, attribute);
return repository.getEntityDefinition(entityType, attribute, value);
}
@Override
public String getEntityDefinition(String entityType, String attribute, String value) throws AtlasException {
final ITypedReferenceableInstance instance = getEntityDefinitionReference(entityType, attribute, value);
return InstanceSerialization.toJson(instance, true);
}
/**
* Validate that attribute is unique attribute
* @param entityType the entity type
* @param attributeName the name of the attribute
*/
private void validateUniqueAttribute(String entityType, String attributeName) throws AtlasException {
ClassType type = typeSystem.getDataType(ClassType.class, entityType);
AttributeInfo attribute = type.fieldMapping().fields.get(attributeName);
if (!attribute.isUnique) {
throw new IllegalArgumentException(
String.format("%s.%s is not a unique attribute", entityType, attributeName));
}
}
/**
* Return the list of entity guids for the given type in the repository.
*
* @param entityType type
* @return list of entity guids for the given type in the repository
*/
@Override
public List<String> getEntityList(String entityType) throws AtlasException {
validateTypeExists(entityType);
return repository.getEntityList(entityType);
}
/**
* Updates an entity, instance of the type based on the guid set.
*
* @param entityInstanceDefinition json array of entity definitions
* @return guids - json array of guids
*/
@Override
public AtlasClient.EntityResult updateEntities(String entityInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
AtlasClient.EntityResult entityResult = repository.updateEntities(typedInstances);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private void onEntitiesAddedUpdated(AtlasClient.EntityResult entityResult) throws AtlasException {
onEntitiesAdded(entityResult.getCreatedEntities());
onEntitiesUpdated(entityResult.getUpdateEntities());
//Note: doesn't access deletedEntities from entityResult
onEntitiesDeleted(RequestContext.get().getDeletedEntities());
}
@Override
public AtlasClient.EntityResult updateEntityAttributeByGuid(final String guid, String attributeName,
String value) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(attributeName, "attribute name");
ParamChecker.notEmpty(value, "attribute value");
ITypedReferenceableInstance existInstance = validateEntityExists(guid);
ClassType type = typeSystem.getDataType(ClassType.class, existInstance.getTypeName());
ITypedReferenceableInstance newInstance = type.createInstance();
AttributeInfo attributeInfo = type.fieldMapping.fields.get(attributeName);
if (attributeInfo == null) {
throw new AtlasException("Invalid property " + attributeName + " for entity " + existInstance.getTypeName());
}
DataTypes.TypeCategory attrTypeCategory = attributeInfo.dataType().getTypeCategory();
switch(attrTypeCategory) {
case PRIMITIVE:
newInstance.set(attributeName, value);
break;
case CLASS:
Id id = new Id(value, 0, attributeInfo.dataType().getName());
newInstance.set(attributeName, id);
break;
default:
throw new AtlasException("Update of " + attrTypeCategory + " is not supported");
}
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private ITypedReferenceableInstance validateEntityExists(String guid)
throws EntityNotFoundException, RepositoryException {
final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid);
if (instance == null) {
throw new EntityNotFoundException(String.format("Entity with guid %s not found ", guid));
}
return instance;
}
@Override
public AtlasClient.EntityResult updateEntityPartialByGuid(final String guid, Referenceable newEntity)
throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notNull(newEntity, "updatedEntity cannot be null");
ITypedReferenceableInstance existInstance = validateEntityExists(guid);
ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName());
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName)
throws AtlasException {
ClassType type = typeSystem.getDataType(ClassType.class, typeName);
ITypedReferenceableInstance newInstance = type.createInstance();
for (String attributeName : updatedEntity.getValuesMap().keySet()) {
AttributeInfo attributeInfo = type.fieldMapping.fields.get(attributeName);
if (attributeInfo == null) {
throw new AtlasException("Invalid property " + attributeName + " for entity " + updatedEntity);
}
DataTypes.TypeCategory attrTypeCategory = attributeInfo.dataType().getTypeCategory();
Object value = updatedEntity.get(attributeName);
if (value != null) {
switch (attrTypeCategory) {
case CLASS:
if (value instanceof Referenceable) {
newInstance.set(attributeName, value);
} else {
Id id = new Id((String) value, 0, attributeInfo.dataType().getName());
newInstance.set(attributeName, id);
}
break;
case ENUM:
case PRIMITIVE:
case ARRAY:
case STRUCT:
case MAP:
newInstance.set(attributeName, value);
break;
case TRAIT:
//TODO - handle trait updates as well?
default:
throw new AtlasException("Update of " + attrTypeCategory + " is not supported");
}
}
}
return newInstance;
}
@Override
public AtlasClient.EntityResult updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue,
Referenceable updatedEntity) throws AtlasException {
ParamChecker.notEmpty(typeName, "typeName");
ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName");
ParamChecker.notNull(attrValue, "unique attribute value");
ParamChecker.notNull(updatedEntity, "updatedEntity");
ITypedReferenceableInstance oldInstance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue);
final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName);
((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId());
AtlasClient.EntityResult entityResult = repository.updatePartial(newInstance);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private void validateTypeExists(String entityType) throws AtlasException {
ParamChecker.notEmpty(entityType, "entity type");
IDataType type = typeSystem.getDataType(IDataType.class, entityType);
if (type.getTypeCategory() != DataTypes.TypeCategory.CLASS) {
throw new IllegalArgumentException("type " + entityType + " not a CLASS type");
}
}
/**
* Gets the list of trait names for a given entity represented by a guid.
*
* @param guid globally unique identifier for the entity
* @return a list of trait names for the given entity guid
* @throws AtlasException
*/
@Override
public List<String> getTraitNames(String guid) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
return repository.getTraitNames(guid);
}
/**
* Adds a new trait to an existing entity represented by a guid.
*
* @param guid globally unique identifier for the entity
* @param traitInstanceDefinition trait instance json that needs to be added to entity
* @throws AtlasException
*/
@Override
public void addTrait(String guid, String traitInstanceDefinition) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(traitInstanceDefinition, "trait instance definition");
ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition);
addTrait(guid, traitInstance);
}
public void addTrait(String guid, ITypedStruct traitInstance) throws AtlasException {
final String traitName = traitInstance.getTypeName();
// ensure trait type is already registered with the TS
if (!typeSystem.isRegistered(traitName)) {
String msg = String.format("trait=%s should be defined in type system before it can be added", traitName);
LOG.error(msg);
throw new TypeNotFoundException(msg);
}
// ensure trait is not already defined
Preconditions
.checkArgument(!getTraitNames(guid).contains(traitName), "trait=%s is already defined for entity=%s",
traitName, guid);
repository.addTrait(guid, traitInstance);
onTraitAddedToEntity(repository.getEntityDefinition(guid), traitInstance);
}
private ITypedStruct deserializeTraitInstance(String traitInstanceDefinition)
throws AtlasException {
return createTraitInstance(InstanceSerialization.fromJsonStruct(traitInstanceDefinition, true));
}
@Override
public ITypedStruct createTraitInstance(Struct traitInstance) throws AtlasException {
try {
final String entityTypeName = traitInstance.getTypeName();
ParamChecker.notEmpty(entityTypeName, "entity type");
TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName);
return traitType.convert(traitInstance, Multiplicity.REQUIRED);
} catch (TypeNotFoundException e) {
throw e;
} catch (Exception e) {
throw new AtlasException("Error deserializing trait instance", e);
}
}
/**
* Deletes a given trait from an existing entity represented by a guid.
*
* @param guid globally unique identifier for the entity
* @param traitNameToBeDeleted name of the trait
* @throws AtlasException
*/
@Override
public void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmpty(traitNameToBeDeleted, "trait name");
// ensure trait type is already registered with the TS
if (!typeSystem.isRegistered(traitNameToBeDeleted)) {
final String msg = String.format("trait=%s should be defined in type system before it can be deleted",
traitNameToBeDeleted);
LOG.error(msg);
throw new TypeNotFoundException(msg);
}
repository.deleteTrait(guid, traitNameToBeDeleted);
onTraitDeletedFromEntity(repository.getEntityDefinition(guid), traitNameToBeDeleted);
}
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
for (TypesChangeListener listener : typeChangeListeners) {
listener.onAdd(typesAdded.values());
}
}
private void onEntitiesAdded(List<String> guids) throws AtlasException {
List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesAdded(entities);
}
}
private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws EntityNotFoundException,
RepositoryException {
List<ITypedReferenceableInstance> entities = new ArrayList<>();
for (String guid : guids) {
entities.add(repository.getEntityDefinition(guid));
}
return entities;
}
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
for (TypesChangeListener listener : typeChangeListeners) {
listener.onChange(typesUpdated.values());
}
}
private void onEntitiesUpdated(List<String> guids) throws AtlasException {
List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesUpdated(entities);
}
}
private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitAdded(entity, trait);
}
}
private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onTraitDeleted(entity, traitName);
}
}
public void registerListener(EntityChangeListener listener) {
entityChangeListeners.add(listener);
}
public void unregisterListener(EntityChangeListener listener) {
entityChangeListeners.remove(listener);
}
@Override
public List<EntityAuditEvent> getAuditEvents(String guid, String startKey, short count) throws AtlasException {
ParamChecker.notEmpty(guid, "entity id");
ParamChecker.notEmptyIfNotNull(startKey, "start key");
ParamChecker.lessThan(count, maxAuditResults, "count");
return auditRepository.listEvents(guid, startKey, count);
}
/* (non-Javadoc)
* @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String)
*/
@Override
public AtlasClient.EntityResult deleteEntities(List<String> deleteCandidateGuids) throws AtlasException {
ParamChecker.notEmpty(deleteCandidateGuids, "delete candidate guids");
return deleteGuids(deleteCandidateGuids);
}
@Override
public AtlasClient.EntityResult deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName,
String attrValue) throws AtlasException {
ParamChecker.notEmpty(typeName, "delete candidate typeName");
ParamChecker.notEmpty(uniqueAttributeName, "delete candidate unique attribute name");
ParamChecker.notEmpty(attrValue, "delete candidate unique attribute value");
//Throws EntityNotFoundException if the entity could not be found by its unique attribute
ITypedReferenceableInstance instance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue);
final Id instanceId = instance.getId();
List<String> deleteCandidateGuids = new ArrayList<String>() {{ add(instanceId._getId());}};
return deleteGuids(deleteCandidateGuids);
}
private AtlasClient.EntityResult deleteGuids(List<String> deleteCandidateGuids) throws AtlasException {
AtlasClient.EntityResult entityResult = repository.deleteEntities(deleteCandidateGuids);
onEntitiesAddedUpdated(entityResult);
return entityResult;
}
private void onEntitiesDeleted(List<ITypedReferenceableInstance> entities) throws AtlasException {
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesDeleted(entities);
}
}
/**
* Create or restore the {@link TypeSystem} cache on server activation.
*
* When an instance is passive, types could be created outside of its cache by the active instance.
* Hence, when this instance becomes active, it needs to restore the cache from the backend store.
* The first time initialization happens, the indices for these types also needs to be created.
* This must happen only from the active instance, as it updates shared backend state.
*/
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active state: restoring type system");
restoreTypeSystem();
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: no action right now");
}
}