/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.atlas.repository.store.graph;

import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.util.FilterUtil;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasEnumType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


/**
 * Abstract class for graph persistence store for TypeDef
 */
public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, ActiveStateChangeHandler {

    private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class);

    private final AtlasTypeRegistry          typeRegistry;
    private final Set<TypeDefChangeListener> typeDefChangeListeners;
    private final int                        typeUpdateLockMaxWaitTimeSeconds;

    protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry,
                                     Set<TypeDefChangeListener> typeDefChangeListeners) {
        this.typeRegistry                     = typeRegistry;
        this.typeDefChangeListeners           = typeDefChangeListeners;
        this.typeUpdateLockMaxWaitTimeSeconds = AtlasRepositoryConfiguration.getTypeUpdateLockMaxWaitTimeInSeconds();
    }

    protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry);

    protected abstract AtlasStructDefStore getStructDefStore(AtlasTypeRegistry typeRegistry);

    protected abstract AtlasClassificationDefStore getClassificationDefStore(AtlasTypeRegistry typeRegistry);

    protected abstract AtlasEntityDefStore getEntityDefStore(AtlasTypeRegistry typeRegistry);

    @Override
    public void init() throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr           = null;
        boolean                    commitUpdates = false;

        try {
            ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);

            ttr.clear();

            AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(ttr).getAll(),
                    getStructDefStore(ttr).getAll(),
                    getClassificationDefStore(ttr).getAll(),
                    getEntityDefStore(ttr).getAll());

            rectifyTypeErrorsIfAny(typesDef);

            ttr.addTypes(typesDef);

            commitUpdates = true;
        } finally {
            typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates);
        }

        bootstrapTypes();
    }

    @Override
    public AtlasEnumDef getEnumDefByName(String name) throws AtlasBaseException {
        AtlasEnumDef ret = typeRegistry.getEnumDefByName(name);
        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
        }
        return ret;
    }

    @Override
    public AtlasEnumDef getEnumDefByGuid(String guid) throws AtlasBaseException {
        AtlasEnumDef ret = typeRegistry.getEnumDefByGuid(guid);
        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
        }
        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasEnumDef updateEnumDefByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByName(name, enumDef, ttr);

        return getEnumDefStore(ttr).updateByName(name, enumDef);
    }

    @Override
    @GraphTransaction
    public AtlasEnumDef updateEnumDefByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByGUID(guid, enumDef, ttr);

        return getEnumDefStore(ttr).updateByGuid(guid, enumDef);
    }

    @Override
    public AtlasStructDef getStructDefByName(String name) throws AtlasBaseException {
        AtlasStructDef ret = typeRegistry.getStructDefByName(name);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
        }

        return ret;
    }

    @Override
    public AtlasStructDef getStructDefByGuid(String guid) throws AtlasBaseException {
        AtlasStructDef ret = typeRegistry.getStructDefByGuid(guid);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
        }

        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasStructDef updateStructDefByName(String name, AtlasStructDef structDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByName(name, structDef, ttr);

        return getStructDefStore(ttr).updateByName(name, structDef);
    }

    @Override
    @GraphTransaction
    public AtlasStructDef updateStructDefByGuid(String guid, AtlasStructDef structDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByGUID(guid, structDef, ttr);

        return getStructDefStore(ttr).updateByGuid(guid, structDef);
    }

    @Override
    public AtlasClassificationDef getClassificationDefByName(String name) throws AtlasBaseException {
        AtlasClassificationDef ret = typeRegistry.getClassificationDefByName(name);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
        }

        return ret;
    }

    @Override
    public AtlasClassificationDef getClassificationDefByGuid(String guid) throws AtlasBaseException {
        AtlasClassificationDef ret = typeRegistry.getClassificationDefByGuid(guid);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
        }

        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasClassificationDef updateClassificationDefByName(String name, AtlasClassificationDef classificationDef)
            throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByName(name, classificationDef, ttr);

        return getClassificationDefStore(ttr).updateByName(name, classificationDef);
    }

    @Override
    @GraphTransaction
    public AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassificationDef classificationDef)
            throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByGUID(guid, classificationDef, ttr);

        return getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
    }

    @Override
    public AtlasEntityDef getEntityDefByName(String name) throws AtlasBaseException {
        AtlasEntityDef ret = typeRegistry.getEntityDefByName(name);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
        }

        return ret;
    }

    @Override
    public AtlasEntityDef getEntityDefByGuid(String guid) throws AtlasBaseException {
        AtlasEntityDef ret = typeRegistry.getEntityDefByGuid(guid);

        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
        }

        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasEntityDef updateEntityDefByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByName(name, entityDef, ttr);

        return getEntityDefStore(ttr).updateByName(name, entityDef);
    }

    @Override
    @GraphTransaction
    public AtlasEntityDef updateEntityDefByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        tryUpdateByGUID(guid, entityDef, ttr);

        return getEntityDefStore(ttr).updateByGuid(guid, entityDef);
    }

    @Override
    @GraphTransaction
    public AtlasTypesDef createTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classifications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }

        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
        tryTypeCreation(typesDef, ttr);


        AtlasTypesDef ret = addToGraphStore(typesDef, ttr);

        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }

        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasTypesDef createUpdateTypesDef(AtlasTypesDef typesToCreate, AtlasTypesDef typesToUpdate) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTypeDefGraphStore.createUpdateTypesDef({}, {})", typesToCreate, typesToUpdate);
        }

        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        if (!typesToUpdate.isEmpty()) {
            ttr.updateTypesWithNoRefResolve(typesToUpdate);
        }

        // Translate any NOT FOUND errors to BAD REQUEST
        tryTypeCreation(typesToCreate, ttr);

        AtlasTypesDef ret = addToGraphStore(typesToCreate, ttr);

        if (!typesToUpdate.isEmpty()) {
            AtlasTypesDef updatedTypes = updateGraphStore(typesToUpdate, ttr);

            if (CollectionUtils.isNotEmpty(updatedTypes.getEnumDefs())) {
                for (AtlasEnumDef enumDef : updatedTypes.getEnumDefs()) {
                    ret.getEnumDefs().add(enumDef);
                }
            }

            if (CollectionUtils.isNotEmpty(updatedTypes.getStructDefs())) {
                for (AtlasStructDef structDef : updatedTypes.getStructDefs()) {
                    ret.getStructDefs().add(structDef);
                }
            }

            if (CollectionUtils.isNotEmpty(updatedTypes.getClassificationDefs())) {
                for (AtlasClassificationDef classificationDef : updatedTypes.getClassificationDefs()) {
                    ret.getClassificationDefs().add(classificationDef);
                }
            }

            if (CollectionUtils.isNotEmpty(updatedTypes.getEntityDefs())) {
                for (AtlasEntityDef entityDef : updatedTypes.getEntityDefs()) {
                    ret.getEntityDefs().add(entityDef);
                }
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret);
        }

        return ret;
    }

    @Override
    @GraphTransaction
    public AtlasTypesDef updateTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }

        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        // Translate any NOT FOUND errors to BAD REQUEST
        try {
            ttr.updateTypes(typesDef);
        } catch (AtlasBaseException e) {
            if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode()) {
                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
            } else {
                throw e;
            }
        }

        AtlasTypesDef ret = updateGraphStore(typesDef, ttr);

        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }

        return ret;

    }

    @Override
    @GraphTransaction
    public void deleteTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }

        AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();

        AtlasEnumDefStore           enumDefStore     = getEnumDefStore(ttr);
        AtlasStructDefStore         structDefStore   = getStructDefStore(ttr);
        AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
        AtlasEntityDefStore         entityDefStore   = getEntityDefStore(ttr);

        List<Object> preDeleteStructDefs   = new ArrayList<>();
        List<Object> preDeleteClassifiDefs = new ArrayList<>();
        List<Object> preDeleteEntityDefs   = new ArrayList<>();

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                if (StringUtils.isNotBlank(structDef.getGuid())) {
                    preDeleteStructDefs.add(structDefStore.preDeleteByGuid(structDef.getGuid()));
                } else {
                    preDeleteStructDefs.add(structDefStore.preDeleteByName(structDef.getName()));
                }
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
                if (StringUtils.isNotBlank(classifiDef.getGuid())) {
                    preDeleteClassifiDefs.add(classifiDefStore.preDeleteByGuid(classifiDef.getGuid()));
                } else {
                    preDeleteClassifiDefs.add(classifiDefStore.preDeleteByName(classifiDef.getName()));
                }
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                if (StringUtils.isNotBlank(entityDef.getGuid())) {
                    preDeleteEntityDefs.add(entityDefStore.preDeleteByGuid(entityDef.getGuid()));
                } else {
                    preDeleteEntityDefs.add(entityDefStore.preDeleteByName(entityDef.getName()));
                }
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            int i = 0;
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                if (StringUtils.isNotBlank(structDef.getGuid())) {
                    structDefStore.deleteByGuid(structDef.getGuid(), preDeleteStructDefs.get(i));
                } else {
                    structDefStore.deleteByName(structDef.getName(), preDeleteStructDefs.get(i));
                }
                i++;
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            int i = 0;
            for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
                if (StringUtils.isNotBlank(classifiDef.getGuid())) {
                    classifiDefStore.deleteByGuid(classifiDef.getGuid(), preDeleteClassifiDefs.get(i));
                } else {
                    classifiDefStore.deleteByName(classifiDef.getName(), preDeleteClassifiDefs.get(i));
                }
                i++;
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            int i = 0;
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                if (StringUtils.isNotBlank(entityDef.getGuid())) {
                    entityDefStore.deleteByGuid(entityDef.getGuid(), preDeleteEntityDefs.get(i));
                } else {
                    entityDefStore.deleteByName(entityDef.getName(), preDeleteEntityDefs.get(i));
                }
                i++;
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
            for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
                if (StringUtils.isNotBlank(enumDef.getGuid())) {
                    enumDefStore.deleteByGuid(enumDef.getGuid());
                } else {
                    enumDefStore.deleteByName(enumDef.getName());
                }
            }
        }

        // Remove all from
        ttr.removeTypesDef(typesDef);

        if (LOG.isDebugEnabled()) {
            LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
                    CollectionUtils.size(typesDef.getEnumDefs()),
                    CollectionUtils.size(typesDef.getStructDefs()),
                    CollectionUtils.size(typesDef.getClassificationDefs()),
                    CollectionUtils.size(typesDef.getEntityDefs()));
        }
    }

    @Override
    public AtlasTypesDef searchTypesDef(SearchFilter searchFilter) throws AtlasBaseException {
        final AtlasTypesDef typesDef = new AtlasTypesDef();
        Predicate searchPredicates = FilterUtil.getPredicateFromSearchFilter(searchFilter);

        for(AtlasEnumType enumType : typeRegistry.getAllEnumTypes()) {
            if (searchPredicates.evaluate(enumType)) {
                typesDef.getEnumDefs().add(enumType.getEnumDef());
            }
        }

        for(AtlasStructType structType : typeRegistry.getAllStructTypes()) {
            if (searchPredicates.evaluate(structType)) {
                typesDef.getStructDefs().add(structType.getStructDef());
            }
        }

        for(AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) {
            if (searchPredicates.evaluate(classificationType)) {
                typesDef.getClassificationDefs().add(classificationType.getClassificationDef());
            }
        }

        for(AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
            if (searchPredicates.evaluate(entityType)) {
                typesDef.getEntityDefs().add(entityType.getEntityDef());
            }
        }

        return typesDef;
    }

    @Override
    public AtlasBaseTypeDef getByName(String name) throws AtlasBaseException {
        if (StringUtils.isBlank(name)) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, "", name);
        }
        AtlasType type = typeRegistry.getType(name);
        return getTypeDefFromType(type);
    }

    @Override
    public AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException {
        if (StringUtils.isBlank(guid)) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
        }
        AtlasType type = typeRegistry.getTypeByGuid(guid);
        return getTypeDefFromType(type);
    }

    @Override
    public void instanceIsActive() throws AtlasException {
        try {
            init();
        } catch (AtlasBaseException e) {
            LOG.error("Failed to init after becoming active", e);
        }
    }

    @Override
    public void instanceIsPassive() throws AtlasException {
        LOG.info("Not reacting to a Passive state change");
    }

    private void bootstrapTypes() {
        AtlasTypeDefStoreInitializer storeInitializer = new AtlasTypeDefStoreInitializer();

        String atlasHomeDir = System.getProperty("atlas.home");
        String typesDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";

        storeInitializer.initializeStore(this, typeRegistry, typesDirName);
    }

    private AtlasBaseTypeDef getTypeDefFromType(AtlasType type) throws AtlasBaseException {
        AtlasBaseTypeDef ret;
        switch (type.getTypeCategory()) {
            case ENUM:
                ret = ((AtlasEnumType) type).getEnumDef();
                break;
            case STRUCT:
                ret = ((AtlasStructType) type).getStructDef();
                break;
            case CLASSIFICATION:
                ret = ((AtlasClassificationType) type).getClassificationDef();
                break;
            case ENTITY:
                ret = ((AtlasEntityType) type).getEntityDef();
                break;
            case PRIMITIVE:
            case OBJECT_ID_TYPE:
            case ARRAY:
            case MAP:
            default:
                throw new AtlasBaseException(AtlasErrorCode.SYSTEM_TYPE, type.getTypeCategory().name());
        }
        return ret;
    }

    private AtlasTransientTypeRegistry lockTypeRegistryAndReleasePostCommit() throws AtlasBaseException {
        AtlasTransientTypeRegistry ttr = typeRegistry.lockTypeRegistryForUpdate(typeUpdateLockMaxWaitTimeSeconds);

        new TypeRegistryUpdateHook(ttr);

        return ttr;
    }

    private void rectifyTypeErrorsIfAny(AtlasTypesDef typesDef) {
        final Set<String> entityNames = new HashSet<>();

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                entityNames.add(entityDef.getName());
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                rectifyAttributesIfNeeded(entityNames, structDef);
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            for (AtlasClassificationDef classificationDef : typesDef.getClassificationDefs()) {
                rectifyAttributesIfNeeded(entityNames, classificationDef);
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                rectifyAttributesIfNeeded(entityNames, entityDef);
            }
        }
    }

    private void rectifyAttributesIfNeeded(final Set<String> entityNames, AtlasStructDef structDef) {
        List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs();

        if (CollectionUtils.isNotEmpty(attributeDefs)) {
            for (AtlasAttributeDef attributeDef : attributeDefs) {
                if (!hasOwnedReferenceConstraint(attributeDef.getConstraints())) {
                    continue;
                }

                Set<String> referencedTypeNames = AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName());

                boolean valid = false;

                for (String referencedTypeName : referencedTypeNames) {
                    if (entityNames.contains(referencedTypeName)) {
                        valid = true;
                        break;
                    }
                }

                if (!valid) {
                    rectifyOwnedReferenceError(structDef, attributeDef);
                }
            }
        }
    }

    private boolean hasOwnedReferenceConstraint(List<AtlasConstraintDef> constraints) {
        if (CollectionUtils.isNotEmpty(constraints)) {
            for (AtlasConstraintDef constraint : constraints) {
                if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
                    return true;
                }
            }
        }

        return false;
    }

    private void rectifyOwnedReferenceError(AtlasStructDef structDef, AtlasAttributeDef attributeDef) {
        List<AtlasConstraintDef> constraints = attributeDef.getConstraints();

        if (CollectionUtils.isNotEmpty(constraints)) {
            for (int i = 0; i < constraints.size(); i++) {
                AtlasConstraintDef constraint = constraints.get(i);

                if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
                    LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName());

                    constraints.remove(i);
                    i--;
                }
            }
        }
    }

    private AtlasTypesDef addToGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
        AtlasTypesDef ret = new AtlasTypesDef();

        AtlasEnumDefStore           enumDefStore     = getEnumDefStore(ttr);
        AtlasStructDefStore         structDefStore   = getStructDefStore(ttr);
        AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
        AtlasEntityDefStore         entityDefStore   = getEntityDefStore(ttr);

        List<Object> preCreateStructDefs   = new ArrayList<>();
        List<Object> preCreateClassifiDefs = new ArrayList<>();
        List<Object> preCreateEntityDefs   = new ArrayList<>();

        if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
            for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
                AtlasEnumDef createdDef = enumDefStore.create(enumDef);

                ttr.updateGuid(createdDef.getName(), createdDef.getGuid());

                ret.getEnumDefs().add(createdDef);
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                preCreateStructDefs.add(structDefStore.preCreate(structDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
                preCreateClassifiDefs.add(classifiDefStore.preCreate(classifiDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                preCreateEntityDefs.add(entityDefStore.preCreate(entityDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            int i = 0;
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                AtlasStructDef createdDef = structDefStore.create(structDef, preCreateStructDefs.get(i));

                ttr.updateGuid(createdDef.getName(), createdDef.getGuid());

                ret.getStructDefs().add(createdDef);
                i++;
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            int i = 0;
            for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
                AtlasClassificationDef createdDef = classifiDefStore.create(classifiDef, preCreateClassifiDefs.get(i));

                ttr.updateGuid(createdDef.getName(), createdDef.getGuid());

                ret.getClassificationDefs().add(createdDef);
                i++;
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            int i = 0;
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                AtlasEntityDef createdDef = entityDefStore.create(entityDef, preCreateEntityDefs.get(i));

                ttr.updateGuid(createdDef.getName(), createdDef.getGuid());

                ret.getEntityDefs().add(createdDef);
                i++;
            }
        }

        return ret;
    }

    private AtlasTypesDef updateGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
        AtlasTypesDef ret = new AtlasTypesDef();

        AtlasEnumDefStore           enumDefStore     = getEnumDefStore(ttr);
        AtlasStructDefStore         structDefStore   = getStructDefStore(ttr);
        AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
        AtlasEntityDefStore         entityDefStore   = getEntityDefStore(ttr);

        if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
            for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
                ret.getEnumDefs().add(enumDefStore.update(enumDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
            for (AtlasStructDef structDef : typesDef.getStructDefs()) {
                ret.getStructDefs().add(structDefStore.update(structDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
            for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
                ret.getClassificationDefs().add(classifiDefStore.update(classifiDef));
            }
        }

        if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
            for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
                ret.getEntityDefs().add(entityDefStore.update(entityDef));
            }
        }

        return ret;
    }

    private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {

        private final AtlasTransientTypeRegistry ttr;

        private TypeRegistryUpdateHook(AtlasTransientTypeRegistry ttr) {
            super();

            this.ttr = ttr;
        }
        @Override
        public void onComplete(boolean isSuccess) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
            }

            typeRegistry.releaseTypeRegistryForUpdate(ttr, isSuccess);

            if (isSuccess) {
                notifyListeners(ttr);
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("<== TypeRegistryUpdateHook.onComplete({})", isSuccess);
            }
        }

        private void notifyListeners(AtlasTransientTypeRegistry ttr) {
            if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) {
                ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(ttr.getAddedTypes(),
                        ttr.getUpdatedTypes(),
                        ttr.getDeleteedTypes());

                for (TypeDefChangeListener changeListener : typeDefChangeListeners) {
                    try {
                        changeListener.onChange(changedTypeDefs);
                    } catch (Throwable t) {
                        LOG.error("OnChange failed for listener {}", changeListener.getClass().getName(), t);
                    }
                }
            }
        }

    }

    private void tryUpdateByName(String name, AtlasBaseTypeDef typeDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
        try {
            ttr.updateTypeByName(name, typeDef);
        } catch (AtlasBaseException e) {
            if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode()) {
                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
            } else {
                throw e;
            }
        }
    }

    private void tryUpdateByGUID(String guid, AtlasBaseTypeDef typeDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
        try {
            ttr.updateTypeByGuid(guid, typeDef);
        } catch (AtlasBaseException e) {
            if (AtlasErrorCode.TYPE_GUID_NOT_FOUND == e.getAtlasErrorCode()) {
                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
            } else {
                throw e;
            }
        }
    }

    private void tryTypeCreation(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
        // Translate any NOT FOUND errors to BAD REQUEST
        try {
            ttr.addTypes(typesDef);
        } catch (AtlasBaseException e) {
            if (AtlasErrorCode.TYPE_NAME_NOT_FOUND == e.getAtlasErrorCode() ||
                    AtlasErrorCode.TYPE_GUID_NOT_FOUND == e.getAtlasErrorCode()) {
                throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, e.getMessage());
            } else {
                throw e;
            }
        }
    }
}
