blob: 41de29f0c13100ec1a311ccff7b2b48efa36f56e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Abstract class for graph persistence store for TypeDef
*/
public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class);
private final AtlasTypeRegistry typeRegistry;
private final Set<TypeDefChangeListener> typeDefChangeListeners;
private final int typeUpdateLockMaxWaitTimeSeconds;
protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry,
Set<TypeDefChangeListener> typeDefChangeListeners) {
this.typeRegistry = typeRegistry;
this.typeDefChangeListeners = typeDefChangeListeners;
this.typeUpdateLockMaxWaitTimeSeconds = AtlasRepositoryConfiguration.getTypeUpdateLockMaxWaitTimeInSeconds();
}
protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry);
protected abstract AtlasStructDefStore getStructDefStore(AtlasTypeRegistry typeRegistry);
protected abstract AtlasClassificationDefStore getClassificationDefStore(AtlasTypeRegistry typeRegistry);
protected abstract AtlasEntityDefStore getEntityDefStore(AtlasTypeRegistry typeRegistry);
@Override
public void init() throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = null;
boolean commitUpdates = false;
try {
ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
ttr.clear();
AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
getStructDefStore(ttr).getAll(),
getClassificationDefStore(ttr).getAll(),
getEntityDefStore(ttr).getAll());
rectifyTypeErrorsIfAny(typesDef);
ttr.addTypes(typesDef);
commitUpdates = true;
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates);
}
bootstrapTypes();
}
@Override
@GraphTransaction
public AtlasEnumDef getEnumDefByName(String name) throws AtlasBaseException {
AtlasEnumDef ret = typeRegistry.getEnumDefByName(name);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEnumDef getEnumDefByGuid(String guid) throws AtlasBaseException {
AtlasEnumDef ret = typeRegistry.getEnumDefByGuid(guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEnumDef updateEnumDefByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByName(name, enumDef, ttr);
return getEnumDefStore(ttr).updateByName(name, enumDef);
}
@Override
@GraphTransaction
public AtlasEnumDef updateEnumDefByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByGUID(guid, enumDef, ttr);
return getEnumDefStore(ttr).updateByGuid(guid, enumDef);
}
@Override
@GraphTransaction
public AtlasStructDef getStructDefByName(String name) throws AtlasBaseException {
AtlasStructDef ret = typeRegistry.getStructDefByName(name);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
return ret;
}
@Override
@GraphTransaction
public AtlasStructDef getStructDefByGuid(String guid) throws AtlasBaseException {
AtlasStructDef ret = typeRegistry.getStructDefByGuid(guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
return ret;
}
@Override
@GraphTransaction
public AtlasStructDef updateStructDefByName(String name, AtlasStructDef structDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByName(name, structDef, ttr);
return getStructDefStore(ttr).updateByName(name, structDef);
}
@Override
@GraphTransaction
public AtlasStructDef updateStructDefByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByGUID(guid, structDef, ttr);
return getStructDefStore(ttr).updateByGuid(guid, structDef);
}
@Override
@GraphTransaction
public AtlasClassificationDef getClassificationDefByName(String name) throws AtlasBaseException {
AtlasClassificationDef ret = typeRegistry.getClassificationDefByName(name);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
return ret;
}
@Override
@GraphTransaction
public AtlasClassificationDef getClassificationDefByGuid(String guid) throws AtlasBaseException {
AtlasClassificationDef ret = typeRegistry.getClassificationDefByGuid(guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
return ret;
}
@Override
@GraphTransaction
public AtlasClassificationDef updateClassificationDefByName(String name, AtlasClassificationDef classificationDef)
throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByName(name, classificationDef, ttr);
return getClassificationDefStore(ttr).updateByName(name, classificationDef);
}
@Override
@GraphTransaction
public AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassificationDef classificationDef)
throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByGUID(guid, classificationDef, ttr);
return getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
}
@Override
@GraphTransaction
public AtlasEntityDef getEntityDefByName(String name) throws AtlasBaseException {
AtlasEntityDef ret = typeRegistry.getEntityDefByName(name);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEntityDef getEntityDefByGuid(String guid) throws AtlasBaseException {
AtlasEntityDef ret = typeRegistry.getEntityDefByGuid(guid);
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
return ret;
}
@Override
@GraphTransaction
public AtlasEntityDef updateEntityDefByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByName(name, entityDef, ttr);
return getEntityDefStore(ttr).updateByName(name, entityDef);
}
@Override
@GraphTransaction
public AtlasEntityDef updateEntityDefByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryUpdateByGUID(guid, entityDef, ttr);
return getEntityDefStore(ttr).updateByGuid(guid, entityDef);
}
@Override
@GraphTransaction
public AtlasTypesDef createTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classifications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
tryTypeCreation(typesDef, ttr);
AtlasTypesDef ret = addToGraphStore(typesDef, ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
return ret;
}
@Override
@GraphTransaction
public AtlasTypesDef createUpdateTypesDef(AtlasTypesDef typesToCreate, AtlasTypesDef typesToUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeDefGraphStore.createUpdateTypesDef({}, {})", typesToCreate, typesToUpdate);
}
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
if (!typesToUpdate.isEmpty()) {
ttr.updateTypesWithNoRefResolve(typesToUpdate);
}
// Translate any NOT FOUND errors to BAD REQUEST
tryTypeCreation(typesToCreate, ttr);
AtlasTypesDef ret = addToGraphStore(typesToCreate, ttr);
if (!typesToUpdate.isEmpty()) {
AtlasTypesDef updatedTypes = updateGraphStore(typesToUpdate, ttr);
if (CollectionUtils.isNotEmpty(updatedTypes.getEnumDefs())) {
for (AtlasEnumDef enumDef : updatedTypes.getEnumDefs()) {
ret.getEnumDefs().add(enumDef);
}
}
if (CollectionUtils.isNotEmpty(updatedTypes.getStructDefs())) {
for (AtlasStructDef structDef : updatedTypes.getStructDefs()) {
ret.getStructDefs().add(structDef);
}
}
if (CollectionUtils.isNotEmpty(updatedTypes.getClassificationDefs())) {
for (AtlasClassificationDef classificationDef : updatedTypes.getClassificationDefs()) {
ret.getClassificationDefs().add(classificationDef);
}
}
if (CollectionUtils.isNotEmpty(updatedTypes.getEntityDefs())) {
for (AtlasEntityDef entityDef : updatedTypes.getEntityDefs()) {
ret.getEntityDefs().add(entityDef);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret);
}
return ret;
}
@Override
@GraphTransaction
public AtlasTypesDef updateTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
// Translate any NOT FOUND errors to BAD REQUEST
try {
ttr.updateTypes(typesDef);
} catch (AtlasBaseException e) {
if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode()) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
} else {
throw e;
}
}
AtlasTypesDef ret = updateGraphStore(typesDef, ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
return ret;
}
@Override
@GraphTransaction
public void deleteTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
List<Object> preDeleteStructDefs = new ArrayList<>();
List<Object> preDeleteClassifiDefs = new ArrayList<>();
List<Object> preDeleteEntityDefs = new ArrayList<>();
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
if (StringUtils.isNotBlank(structDef.getGuid())) {
preDeleteStructDefs.add(structDefStore.preDeleteByGuid(structDef.getGuid()));
} else {
preDeleteStructDefs.add(structDefStore.preDeleteByName(structDef.getName()));
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
if (StringUtils.isNotBlank(classifiDef.getGuid())) {
preDeleteClassifiDefs.add(classifiDefStore.preDeleteByGuid(classifiDef.getGuid()));
} else {
preDeleteClassifiDefs.add(classifiDefStore.preDeleteByName(classifiDef.getName()));
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
if (StringUtils.isNotBlank(entityDef.getGuid())) {
preDeleteEntityDefs.add(entityDefStore.preDeleteByGuid(entityDef.getGuid()));
} else {
preDeleteEntityDefs.add(entityDefStore.preDeleteByName(entityDef.getName()));
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
int i = 0;
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
if (StringUtils.isNotBlank(structDef.getGuid())) {
structDefStore.deleteByGuid(structDef.getGuid(), preDeleteStructDefs.get(i));
} else {
structDefStore.deleteByName(structDef.getName(), preDeleteStructDefs.get(i));
}
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
int i = 0;
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
if (StringUtils.isNotBlank(classifiDef.getGuid())) {
classifiDefStore.deleteByGuid(classifiDef.getGuid(), preDeleteClassifiDefs.get(i));
} else {
classifiDefStore.deleteByName(classifiDef.getName(), preDeleteClassifiDefs.get(i));
}
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
int i = 0;
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
if (StringUtils.isNotBlank(entityDef.getGuid())) {
entityDefStore.deleteByGuid(entityDef.getGuid(), preDeleteEntityDefs.get(i));
} else {
entityDefStore.deleteByName(entityDef.getName(), preDeleteEntityDefs.get(i));
}
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
if (StringUtils.isNotBlank(enumDef.getGuid())) {
enumDefStore.deleteByGuid(enumDef.getGuid());
} else {
enumDefStore.deleteByName(enumDef.getName());
}
}
}
// Remove all from
ttr.removeTypesDef(typesDef);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
}
@Override
@GraphTransaction
public AtlasTypesDef searchTypesDef(SearchFilter searchFilter) throws AtlasBaseException {
final AtlasTypesDef typesDef = new AtlasTypesDef();
Predicate searchPredicates = FilterUtil.getPredicateFromSearchFilter(searchFilter);
for(AtlasEnumType enumType : typeRegistry.getAllEnumTypes()) {
if (searchPredicates.evaluate(enumType)) {
typesDef.getEnumDefs().add(enumType.getEnumDef());
}
}
for(AtlasStructType structType : typeRegistry.getAllStructTypes()) {
if (searchPredicates.evaluate(structType)) {
typesDef.getStructDefs().add(structType.getStructDef());
}
}
for(AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) {
if (searchPredicates.evaluate(classificationType)) {
typesDef.getClassificationDefs().add(classificationType.getClassificationDef());
}
}
for(AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
if (searchPredicates.evaluate(entityType)) {
typesDef.getEntityDefs().add(entityType.getEntityDef());
}
}
return typesDef;
}
@Override
@GraphTransaction
public AtlasBaseTypeDef getByName(String name) throws AtlasBaseException {
if (StringUtils.isBlank(name)) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, "", name);
}
AtlasType type = typeRegistry.getType(name);
return getTypeDefFromType(type);
}
@Override
@GraphTransaction
public AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException {
if (StringUtils.isBlank(guid)) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
}
AtlasType type = typeRegistry.getTypeByGuid(guid);
return getTypeDefFromType(type);
}
@Override
public void instanceIsActive() throws AtlasException {
try {
init();
} catch (AtlasBaseException e) {
LOG.error("Failed to init after becoming active", e);
}
}
@Override
public void instanceIsPassive() throws AtlasException {
LOG.info("Not reacting to a Passive state change");
}
private void bootstrapTypes() {
AtlasTypeDefStoreInitializer storeInitializer = new AtlasTypeDefStoreInitializer();
String atlasHomeDir = System.getProperty("atlas.home");
String typesDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
storeInitializer.initializeStore(this, typeRegistry, typesDirName);
}
private AtlasBaseTypeDef getTypeDefFromType(AtlasType type) throws AtlasBaseException {
AtlasBaseTypeDef ret;
switch (type.getTypeCategory()) {
case ENUM:
ret = ((AtlasEnumType) type).getEnumDef();
break;
case STRUCT:
ret = ((AtlasStructType) type).getStructDef();
break;
case CLASSIFICATION:
ret = ((AtlasClassificationType) type).getClassificationDef();
break;
case ENTITY:
ret = ((AtlasEntityType) type).getEntityDef();
break;
case PRIMITIVE:
case OBJECT_ID_TYPE:
case ARRAY:
case MAP:
default:
throw new AtlasBaseException(AtlasErrorCode.SYSTEM_TYPE, type.getTypeCategory().name());
}
return ret;
}
private AtlasTransientTypeRegistry lockTypeRegistryAndReleasePostCommit() throws AtlasBaseException {
AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);
new TypeRegistryUpdateHook(ttr);
return ttr;
}
private void rectifyTypeErrorsIfAny(AtlasTypesDef typesDef) {
final Set<String> entityNames = new HashSet<>();
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
entityNames.add(entityDef.getName());
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
rectifyAttributesIfNeeded(entityNames, structDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classificationDef : typesDef.getClassificationDefs()) {
rectifyAttributesIfNeeded(entityNames, classificationDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
rectifyAttributesIfNeeded(entityNames, entityDef);
}
}
}
private void rectifyAttributesIfNeeded(final Set<String> entityNames, AtlasStructDef structDef) {
List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs();
if (CollectionUtils.isNotEmpty(attributeDefs)) {
for (AtlasAttributeDef attributeDef : attributeDefs) {
if (!hasOwnedReferenceConstraint(attributeDef.getConstraints())) {
continue;
}
Set<String> referencedTypeNames = AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName());
boolean valid = false;
for (String referencedTypeName : referencedTypeNames) {
if (entityNames.contains(referencedTypeName)) {
valid = true;
break;
}
}
if (!valid) {
rectifyOwnedReferenceError(structDef, attributeDef);
}
}
}
}
private boolean hasOwnedReferenceConstraint(List<AtlasConstraintDef> constraints) {
if (CollectionUtils.isNotEmpty(constraints)) {
for (AtlasConstraintDef constraint : constraints) {
if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
return true;
}
}
}
return false;
}
private void rectifyOwnedReferenceError(AtlasStructDef structDef, AtlasAttributeDef attributeDef) {
List<AtlasConstraintDef> constraints = attributeDef.getConstraints();
if (CollectionUtils.isNotEmpty(constraints)) {
for (int i = 0; i < constraints.size(); i++) {
AtlasConstraintDef constraint = constraints.get(i);
if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName());
constraints.remove(i);
i--;
}
}
}
}
private AtlasTypesDef addToGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
AtlasTypesDef ret = new AtlasTypesDef();
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
List<Object> preCreateStructDefs = new ArrayList<>();
List<Object> preCreateClassifiDefs = new ArrayList<>();
List<Object> preCreateEntityDefs = new ArrayList<>();
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
AtlasEnumDef createdDef = enumDefStore.create(enumDef);
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getEnumDefs().add(createdDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
preCreateStructDefs.add(structDefStore.preCreate(structDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
preCreateClassifiDefs.add(classifiDefStore.preCreate(classifiDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
preCreateEntityDefs.add(entityDefStore.preCreate(entityDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
int i = 0;
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
AtlasStructDef createdDef = structDefStore.create(structDef, preCreateStructDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getStructDefs().add(createdDef);
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
int i = 0;
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
AtlasClassificationDef createdDef = classifiDefStore.create(classifiDef, preCreateClassifiDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getClassificationDefs().add(createdDef);
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
int i = 0;
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
AtlasEntityDef createdDef = entityDefStore.create(entityDef, preCreateEntityDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getEntityDefs().add(createdDef);
i++;
}
}
return ret;
}
private AtlasTypesDef updateGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
AtlasTypesDef ret = new AtlasTypesDef();
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
ret.getEnumDefs().add(enumDefStore.update(enumDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
ret.getStructDefs().add(structDefStore.update(structDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
ret.getClassificationDefs().add(classifiDefStore.update(classifiDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
ret.getEntityDefs().add(entityDefStore.update(entityDef));
}
}
return ret;
}
private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
private final AtlasTransientTypeRegistry ttr;
private TypeRegistryUpdateHook(AtlasTransientTypeRegistry ttr) {
super();
this.ttr = ttr;
}
@Override
public void onComplete(boolean isSuccess) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
typeRegistry.releaseTypeRegistryForUpdate(ttr, isSuccess);
if (isSuccess) {
notifyListeners(ttr);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
}
private void notifyListeners(AtlasTransientTypeRegistry ttr) {
if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) {
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(ttr.getAddedTypes(),
ttr.getUpdatedTypes(),
ttr.getDeleteedTypes());
for (TypeDefChangeListener changeListener : typeDefChangeListeners) {
try {
changeListener.onChange(changedTypeDefs);
} catch (Throwable t) {
LOG.error("OnChange failed for listener {}", changeListener.getClass().getName(), t);
}
}
}
}
}
private void tryUpdateByName(String name, AtlasBaseTypeDef typeDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
try {
ttr.updateTypeByName(name, typeDef);
} catch (AtlasBaseException e) {
if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode()) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
} else {
throw e;
}
}
}
private void tryUpdateByGUID(String guid, AtlasBaseTypeDef typeDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
try {
ttr.updateTypeByGuid(guid, typeDef);
} catch (AtlasBaseException e) {
if (AtlasErrorCode.TYPE_GUID_NOT_FOUND == e.getAtlasErrorCode()) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
} else {
throw e;
}
}
}
private void tryTypeCreation(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
// Translate any NOT FOUND errors to BAD REQUEST
try {
ttr.addTypes(typesDef);
} catch (AtlasBaseException e) {
if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode() ||
AtlasErrorCode.TYPE_GUID_NOT_FOUND == e.getAtlasErrorCode()) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
} else {
throw e;
}
}
}
}