blob: 9924b2e4ce2c7b2d44b64657b300e1592b2bd394 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.*;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.LIST;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.SET;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.SINGLE;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX;
import static org.apache.atlas.type.AtlasTypeUtil.isArrayType;
import static org.apache.atlas.type.AtlasTypeUtil.isMapType;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
/**
* Adds index for properties of a given type when its added before any instances are added.
*/
@Component
@Order(1)
public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler, TypeDefChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
private static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport";
private static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport";
private static final List<Class> INDEX_EXCLUSION_CLASSES = new ArrayList() {
{
add(Boolean.class);
add(BigDecimal.class);
add(BigInteger.class);
}
};
// Added for type lookup when indexing the new typedefs
private final AtlasTypeRegistry typeRegistry;
private final List<IndexChangeListener> indexChangeListeners = new ArrayList<>();
//allows injection of a dummy graph for testing
private IAtlasGraphProvider provider;
private boolean recomputeIndexedKeys = true;
private Set<String> vertexIndexKeys = new HashSet<>();
public static boolean isValidSearchWeight(int searchWeight) {
if (searchWeight != -1 ) {
if (searchWeight < 1 || searchWeight > 10) {
return false;
}
}
return true;
}
public static boolean isStringAttribute(AtlasAttribute attribute) {
return AtlasBaseTypeDef.ATLAS_TYPE_STRING.equals(attribute.getTypeName());
}
public enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE }
@Inject
public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException {
this(new AtlasGraphProvider(), ApplicationProperties.get(), typeRegistry);
}
@VisibleForTesting
GraphBackedSearchIndexer(IAtlasGraphProvider provider, Configuration configuration, AtlasTypeRegistry typeRegistry)
throws IndexException, RepositoryException {
this.provider = provider;
this.typeRegistry = typeRegistry;
//make sure solr index follows graph backed index listener
addIndexListener(new SolrIndexHelper(typeRegistry));
if (!HAConfiguration.isHAEnabled(configuration)) {
initialize(provider.get());
}
notifyInitializationStart();
}
public void addIndexListener(IndexChangeListener listener) {
indexChangeListeners.add(listener);
}
/**
* Initialize global indices for JanusGraph on server activation.
*
* Since the indices are shared state, we need to do this only from an active instance.
*/
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active: initializing index");
try {
initialize();
} catch (RepositoryException | IndexException e) {
throw new AtlasException("Error in reacting to active on initialization", e);
}
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive state: No action right now.");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder();
}
@Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing changed typedefs {}", changedTypeDefs);
}
AtlasGraphManagement management = null;
try {
management = provider.get().getManagementSystem();
// Update index for newly created types
if (CollectionUtils.isNotEmpty(changedTypeDefs.getCreatedTypeDefs())) {
for (AtlasBaseTypeDef typeDef : changedTypeDefs.getCreatedTypeDefs()) {
updateIndexForTypeDef(management, typeDef);
}
}
// Update index for updated types
if (CollectionUtils.isNotEmpty(changedTypeDefs.getUpdatedTypeDefs())) {
for (AtlasBaseTypeDef typeDef : changedTypeDefs.getUpdatedTypeDefs()) {
updateIndexForTypeDef(management, typeDef);
}
}
// Invalidate the property key for deleted types
if (CollectionUtils.isNotEmpty(changedTypeDefs.getDeletedTypeDefs())) {
for (AtlasBaseTypeDef typeDef : changedTypeDefs.getDeletedTypeDefs()) {
deleteIndexForType(management, typeDef);
}
}
//resolve index fields names for the new entity attributes.
resolveIndexFieldNames(management, changedTypeDefs);
createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs());
createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs());
//Commit indexes
commit(management);
} catch (RepositoryException | IndexException e) {
LOG.error("Failed to update indexes for changed typedefs", e);
attemptRollback(changedTypeDefs, management);
}
notifyChangeListeners(changedTypeDefs);
}
@Override
public void onLoadCompletion() throws AtlasBaseException {
if(LOG.isDebugEnabled()) {
LOG.debug("Type definition load completed. Informing the completion to IndexChangeListeners.");
}
Collection<AtlasBaseTypeDef> typeDefs = new ArrayList<>();
typeDefs.addAll(typeRegistry.getAllEntityDefs());
typeDefs.addAll(typeRegistry.getAllBusinessMetadataDefs());
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new ArrayList<>(typeDefs), null);
AtlasGraphManagement management = null;
try {
management = provider.get().getManagementSystem();
//resolve index fields names
resolveIndexFieldNames(management, changedTypeDefs);
//Commit indexes
commit(management);
notifyInitializationCompletion(changedTypeDefs);
} catch (RepositoryException | IndexException e) {
LOG.error("Failed to update indexes for changed typedefs", e);
attemptRollback(changedTypeDefs, management);
}
}
public Set<String> getVertexIndexKeys() {
if (recomputeIndexedKeys) {
AtlasGraphManagement management = null;
try {
management = provider.get().getManagementSystem();
if (management != null) {
AtlasGraphIndex vertexIndex = management.getGraphIndex(VERTEX_INDEX);
if (vertexIndex != null) {
recomputeIndexedKeys = false;
Set<String> indexKeys = new HashSet<>();
for (AtlasPropertyKey fieldKey : vertexIndex.getFieldKeys()) {
indexKeys.add(fieldKey.getName());
}
vertexIndexKeys = indexKeys;
}
management.commit();
}
} catch (Exception excp) {
LOG.error("getVertexIndexKeys(): failed to get indexedKeys from graph", excp);
if (management != null) {
try {
management.rollback();
} catch (Exception e) {
LOG.error("getVertexIndexKeys(): rollback failed", e);
}
}
}
}
return vertexIndexKeys;
}
/**
* Initializes the indices for the graph - create indices for Global AtlasVertex Keys
*/
private void initialize() throws RepositoryException, IndexException {
initialize(provider.get());
}
/**
* Initializes the indices for the graph - create indices for Global AtlasVertex and AtlasEdge Keys
*/
private void initialize(AtlasGraph graph) throws RepositoryException, IndexException {
AtlasGraphManagement management = graph.getManagementSystem();
try {
LOG.info("Creating indexes for graph.");
if (management.getGraphIndex(VERTEX_INDEX) == null) {
management.createVertexMixedIndex(VERTEX_INDEX, BACKING_INDEX, Collections.emptyList());
LOG.info("Created index : {}", VERTEX_INDEX);
}
if (management.getGraphIndex(EDGE_INDEX) == null) {
management.createEdgeMixedIndex(EDGE_INDEX, BACKING_INDEX, Collections.emptyList());
LOG.info("Created index : {}", EDGE_INDEX);
}
if (management.getGraphIndex(FULLTEXT_INDEX) == null) {
management.createFullTextMixedIndex(FULLTEXT_INDEX, BACKING_INDEX, Collections.emptyList());
LOG.info("Created index : {}", FULLTEXT_INDEX);
}
// create vertex indexes
createCommonVertexIndex(management, GUID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, HISTORICAL_GUID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TYPENAME_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TYPESERVICETYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, VERTEX_ID_IN_IMPORT_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, ENTITY_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, SUPER_TYPES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, false);
createCommonVertexIndex(management, TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, MODIFICATION_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, STATE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, CREATED_BY_KEY, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, CLASSIFICATION_TEXT_KEY, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, MODIFIED_BY_KEY, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, true);
createCommonVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, true);
createCommonVertexIndex(management, PENDING_TASKS_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, false);
createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_ACTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_STATE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
// tasks
createCommonVertexIndex(management, TASK_GUID, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
// index recovery
createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
//metrics
createCommonVertexIndex(management," __AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management," __AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management," __AtlasMetricsStat.metrics", UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management," __AtlasMetricsStat.collectionTime", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management," __AtlasMetricsStat.timeToLiveMillis", UniqueKind.NONE, String.class, SINGLE, true, false);
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, Arrays.asList(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY));
// create edge indexes
createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true);
createEdgeIndex(management, EDGE_ID_IN_IMPORT_KEY, String.class, SINGLE, true);
// create fulltext indexes
createFullTextIndex(management, ENTITY_TEXT_PROPERTY_KEY, String.class, SINGLE);
createPropertyKey(management, IS_PROXY_KEY, Boolean.class, SINGLE);
createPropertyKey(management, PROVENANCE_TYPE_KEY, Integer.class, SINGLE);
createPropertyKey(management, HOME_ID_KEY, String.class, SINGLE);
commit(management);
LOG.info("Index creation for global keys complete.");
} catch (Throwable t) {
LOG.error("GraphBackedSearchIndexer.initialize() failed", t);
rollback(management);
throw new RepositoryException(t);
}
}
private void resolveIndexFieldNames(AtlasGraphManagement managementSystem, ChangedTypeDefs changedTypeDefs) {
List<? extends AtlasBaseTypeDef> createdTypeDefs = changedTypeDefs.getCreatedTypeDefs();
if(createdTypeDefs != null) {
resolveIndexFieldNames(managementSystem, createdTypeDefs);
}
List<? extends AtlasBaseTypeDef> updatedTypeDefs = changedTypeDefs.getUpdatedTypeDefs();
if(updatedTypeDefs != null) {
resolveIndexFieldNames(managementSystem, updatedTypeDefs);
}
}
private void resolveIndexFieldNames(AtlasGraphManagement managementSystem, List<? extends AtlasBaseTypeDef> typeDefs) {
for(AtlasBaseTypeDef baseTypeDef: typeDefs) {
if(TypeCategory.ENTITY.equals(baseTypeDef.getCategory())) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(baseTypeDef.getName());
resolveIndexFieldNames(managementSystem, entityType);
} else if(TypeCategory.BUSINESS_METADATA.equals(baseTypeDef.getCategory())) {
AtlasBusinessMetadataType businessMetadataType = typeRegistry.getBusinessMetadataTypeByName(baseTypeDef.getName());
resolveIndexFieldNames(managementSystem, businessMetadataType);
} else {
LOG.debug("Ignoring type definition {}", baseTypeDef.getName());
}
}
}
private void resolveIndexFieldNames(AtlasGraphManagement managementSystem, AtlasStructType structType) {
for(AtlasAttribute attribute: structType.getAllAttributes().values()) {
resolveIndexFieldName(managementSystem, attribute);
}
}
private void resolveIndexFieldName(AtlasGraphManagement managementSystem, AtlasAttribute attribute) {
try {
if (attribute.getIndexFieldName() == null && TypeCategory.PRIMITIVE.equals(attribute.getAttributeType().getTypeCategory())) {
AtlasStructType definedInType = attribute.getDefinedInType();
AtlasAttribute baseInstance = definedInType != null ? definedInType.getAttribute(attribute.getName()) : null;
if (baseInstance != null && baseInstance.getIndexFieldName() != null) {
attribute.setIndexFieldName(baseInstance.getIndexFieldName());
} else if (isIndexApplicable(getPrimitiveClass(attribute.getTypeName()), toAtlasCardinality(attribute.getAttributeDef().getCardinality()))) {
AtlasPropertyKey propertyKey = managementSystem.getPropertyKey(attribute.getVertexPropertyName());
boolean isStringField = AtlasAttributeDef.IndexType.STRING.equals(attribute.getIndexType());
if (propertyKey != null) {
String indexFieldName = managementSystem.getIndexFieldName(Constants.VERTEX_INDEX, propertyKey, isStringField);
attribute.setIndexFieldName(indexFieldName);
if (baseInstance != null) {
baseInstance.setIndexFieldName(indexFieldName);
}
typeRegistry.addIndexFieldName(attribute.getVertexPropertyName(), indexFieldName);
LOG.info("Property {} is mapped to index field name {}", attribute.getQualifiedName(), attribute.getIndexFieldName());
} else {
LOG.warn("resolveIndexFieldName(attribute={}): propertyKey is null for vertextPropertyName={}", attribute.getQualifiedName(), attribute.getVertexPropertyName());
}
}
}
} catch (Exception excp) {
LOG.warn("resolveIndexFieldName(attribute={}) failed.", attribute.getQualifiedName(), excp);
}
}
private void createCommonVertexIndex(AtlasGraphManagement management,
String propertyName,
UniqueKind uniqueKind,
Class propertyClass,
AtlasCardinality cardinality,
boolean createCompositeIndex,
boolean createCompositeIndexWithTypeAndSuperTypes) {
createCommonVertexIndex(management, propertyName, uniqueKind, propertyClass, cardinality, createCompositeIndex, createCompositeIndexWithTypeAndSuperTypes, false);
}
private void createCommonVertexIndex(AtlasGraphManagement management,
String propertyName,
UniqueKind uniqueKind,
Class propertyClass,
AtlasCardinality cardinality,
boolean createCompositeIndex,
boolean createCompositeIndexWithTypeAndSuperTypes,
boolean isStringField) {
if(isStringField && String.class.equals(propertyClass)) {
propertyName = AtlasAttribute.VERTEX_PROPERTY_PREFIX_STRING_INDEX_TYPE +propertyName;
LOG.debug("Creating the common attribute '{}' as string field.", propertyName);
}
final String indexFieldName = createVertexIndex(management,
propertyName,
uniqueKind,
propertyClass,
cardinality,
createCompositeIndex,
createCompositeIndexWithTypeAndSuperTypes, isStringField);
if(indexFieldName != null) {
typeRegistry.addIndexFieldName(propertyName, indexFieldName);
}
}
private void addIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) {
if (typeDef instanceof AtlasEnumDef) {
// Only handle complex types like Struct, Classification and Entity
return;
}
if (typeDef instanceof AtlasStructDef) {
AtlasStructDef structDef = (AtlasStructDef) typeDef;
List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs();
if (CollectionUtils.isNotEmpty(attributeDefs)) {
for (AtlasAttributeDef attributeDef : attributeDefs) {
createIndexForAttribute(management, structDef, attributeDef);
}
}
} else if (!AtlasTypeUtil.isBuiltInType(typeDef.getName())){
throw new IllegalArgumentException("bad data type" + typeDef.getName());
}
}
private void deleteIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) {
Preconditions.checkNotNull(typeDef, "Cannot process null typedef");
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting indexes for type {}", typeDef.getName());
}
if (typeDef instanceof AtlasStructDef) {
AtlasStructDef structDef = (AtlasStructDef) typeDef;
List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs();
if (CollectionUtils.isNotEmpty(attributeDefs)) {
for (AtlasAttributeDef attributeDef : attributeDefs) {
deleteIndexForAttribute(management, typeDef.getName(), attributeDef);
}
}
}
LOG.info("Completed deleting indexes for type {}", typeDef.getName());
}
private void createIndexForAttribute(AtlasGraphManagement management, AtlasStructDef structDef, AtlasAttributeDef attributeDef) {
String qualifiedName = AtlasAttribute.getQualifiedAttributeName(structDef, attributeDef.getName());
final String propertyName = AtlasAttribute.generateVertexPropertyName(structDef, attributeDef, qualifiedName);
AtlasCardinality cardinality = toAtlasCardinality(attributeDef.getCardinality());
boolean isUnique = attributeDef.getIsUnique();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
boolean isBuiltInType = AtlasTypeUtil.isBuiltInType(attribTypeName);
boolean isArrayType = isArrayType(attribTypeName);
boolean isMapType = isMapType(attribTypeName);
final String uniqPropName = isUnique ? AtlasGraphUtilsV2.encodePropertyKey(structDef.getName() + "." + UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName()) : null;
final AtlasAttributeDef.IndexType indexType = attributeDef.getIndexType();
try {
AtlasType atlasType = typeRegistry.getType(structDef.getName());
AtlasType attributeType = typeRegistry.getType(attribTypeName);
if (isClassificationType(attributeType)) {
LOG.warn("Ignoring non-indexable attribute {}", attribTypeName);
}
if (isArrayType) {
createLabelIfNeeded(management, propertyName, attribTypeName);
AtlasArrayType arrayType = (AtlasArrayType) attributeType;
boolean isReference = isReference(arrayType.getElementType());
if (!isReference) {
createPropertyKey(management, propertyName, ArrayList.class, SINGLE);
}
}
if (isMapType) {
createLabelIfNeeded(management, propertyName, attribTypeName);
AtlasMapType mapType = (AtlasMapType) attributeType;
boolean isReference = isReference(mapType.getValueType());
if (!isReference) {
createPropertyKey(management, propertyName, HashMap.class, SINGLE);
}
}
if (isEntityType(attributeType)) {
createEdgeLabel(management, propertyName);
} else if (isBuiltInType) {
if (isRelationshipType(atlasType)) {
createEdgeIndex(management, propertyName, getPrimitiveClass(attribTypeName), cardinality, false);
} else {
Class primitiveClassType = getPrimitiveClass(attribTypeName);
boolean isStringField = false;
if(primitiveClassType == String.class) {
isStringField = AtlasAttributeDef.IndexType.STRING.equals(indexType);
}
createVertexIndex(management, propertyName, UniqueKind.NONE, getPrimitiveClass(attribTypeName), cardinality, isIndexable, false, isStringField);
if (uniqPropName != null) {
createVertexIndex(management, uniqPropName, UniqueKind.PER_TYPE_UNIQUE, getPrimitiveClass(attribTypeName), cardinality, isIndexable, true, isStringField);
}
}
} else if (isEnumType(attributeType)) {
if (isRelationshipType(atlasType)) {
createEdgeIndex(management, propertyName, String.class, cardinality, false);
} else {
createVertexIndex(management, propertyName, UniqueKind.NONE, String.class, cardinality, isIndexable, false, false);
if (uniqPropName != null) {
createVertexIndex(management, uniqPropName, UniqueKind.PER_TYPE_UNIQUE, String.class, cardinality, isIndexable, true, false);
}
}
} else if (isStructType(attributeType)) {
AtlasStructDef attribureStructDef = typeRegistry.getStructDefByName(attribTypeName);
updateIndexForTypeDef(management, attribureStructDef);
}
} catch (AtlasBaseException e) {
LOG.error("No type exists for {}", attribTypeName, e);
}
}
private void deleteIndexForAttribute(AtlasGraphManagement management, String typeName, AtlasAttributeDef attributeDef) {
final String propertyName = AtlasGraphUtilsV2.encodePropertyKey(typeName + "." + attributeDef.getName());
try {
if (management.containsPropertyKey(propertyName)) {
LOG.info("Deleting propertyKey {}, for attribute {}.{}", propertyName, typeName, attributeDef.getName());
management.deletePropertyKey(propertyName);
}
} catch (Exception excp) {
LOG.warn("Failed to delete propertyKey {}, for attribute {}.{}", propertyName, typeName, attributeDef.getName());
}
}
/**
* gets the encoded property name for the attribute passed in.
* @param baseTypeDef the type system of the attribute
* @param attributeDef the attribute definition
* @return the encoded property name for the attribute passed in.
*/
public static String getEncodedPropertyName(AtlasStructDef baseTypeDef, AtlasAttributeDef attributeDef) {
return AtlasAttribute.getQualifiedAttributeName(baseTypeDef, attributeDef.getName());
}
private void createLabelIfNeeded(final AtlasGraphManagement management, final String propertyName, final String attribTypeName) {
// If any of the referenced typename is of type Entity or Struct then the edge label needs to be created
for (String typeName : AtlasTypeUtil.getReferencedTypeNames(attribTypeName)) {
if (typeRegistry.getEntityDefByName(typeName) != null || typeRegistry.getStructDefByName(typeName) != null) {
// Create the edge label upfront to avoid running into concurrent call issue (ATLAS-2092)
createEdgeLabel(management, propertyName);
}
}
}
private boolean isEntityType(AtlasType type) {
return type instanceof AtlasEntityType;
}
private boolean isClassificationType(AtlasType type) {
return type instanceof AtlasClassificationType;
}
private boolean isEnumType(AtlasType type) {
return type instanceof AtlasEnumType;
}
private boolean isStructType(AtlasType type) {
return type instanceof AtlasStructType;
}
private boolean isRelationshipType(AtlasType type) {
return type instanceof AtlasRelationshipType;
}
public Class getPrimitiveClass(String attribTypeName) {
String attributeTypeName = attribTypeName.toLowerCase();
switch (attributeTypeName) {
case ATLAS_TYPE_BOOLEAN:
return Boolean.class;
case ATLAS_TYPE_BYTE:
return Byte.class;
case ATLAS_TYPE_SHORT:
return Short.class;
case ATLAS_TYPE_INT:
return Integer.class;
case ATLAS_TYPE_LONG:
case ATLAS_TYPE_DATE:
return Long.class;
case ATLAS_TYPE_FLOAT:
return Float.class;
case ATLAS_TYPE_DOUBLE:
return Double.class;
case ATLAS_TYPE_BIGINTEGER:
return BigInteger.class;
case ATLAS_TYPE_BIGDECIMAL:
return BigDecimal.class;
case ATLAS_TYPE_STRING:
return String.class;
}
throw new IllegalArgumentException(String.format("Unknown primitive typename %s", attribTypeName));
}
public AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) {
switch (cardinality) {
case SINGLE:
return SINGLE;
case LIST:
return LIST;
case SET:
return SET;
}
// Should never reach this point
throw new IllegalArgumentException(String.format("Bad cardinality %s", cardinality));
}
private void createEdgeLabel(final AtlasGraphManagement management, final String propertyName) {
// Create the edge label upfront to avoid running into concurrent call issue (ATLAS-2092)
// ATLAS-2092 addresses this problem by creating the edge label upfront while type creation
// which resolves the race condition during the entity creation
String label = Constants.INTERNAL_PROPERTY_KEY_PREFIX + propertyName;
createEdgeLabelUsingLabelName(management, label);
}
private void createEdgeLabelUsingLabelName(final AtlasGraphManagement management, final String label) {
if (StringUtils.isEmpty(label)) {
return;
}
org.apache.atlas.repository.graphdb.AtlasEdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
management.makeEdgeLabel(label);
LOG.info("Created edge label {} ", label);
}
}
private AtlasPropertyKey createPropertyKey(AtlasGraphManagement management, String propertyName, Class propertyClass, AtlasCardinality cardinality) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
}
return propertyKey;
}
public String createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass,
AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes, boolean isStringField) {
String indexFieldName = null;
if (propertyName != null) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
if (isIndexApplicable(propertyClass, cardinality)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating backing index for vertex property {} of type {} ", propertyName, propertyClass.getName());
}
indexFieldName = management.addMixedIndex(VERTEX_INDEX, propertyKey, isStringField);
LOG.info("Created backing index for vertex property {} of type {} ", propertyName, propertyClass.getName());
}
}
if(indexFieldName == null && isIndexApplicable(propertyClass, cardinality)) {
indexFieldName = management.getIndexFieldName(VERTEX_INDEX, propertyKey, isStringField);
}
if (propertyKey != null) {
if (createCompositeIndex || uniqueKind == UniqueKind.GLOBAL_UNIQUE || uniqueKind == UniqueKind.PER_TYPE_UNIQUE) {
createVertexCompositeIndex(management, propertyClass, propertyKey, uniqueKind == UniqueKind.GLOBAL_UNIQUE);
}
if (createCompositeIndexWithTypeAndSuperTypes) {
createVertexCompositeIndexWithTypeName(management, propertyClass, propertyKey, uniqueKind == UniqueKind.PER_TYPE_UNIQUE);
createVertexCompositeIndexWithSuperTypeName(management, propertyClass, propertyKey);
}
} else {
LOG.warn("Index not created for {}: propertyKey is null", propertyName);
}
}
return indexFieldName;
}
private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection,
String propertyName, Class propertyClass, AtlasCardinality cardinality) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for property: {} of type: {} ",
edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
}
final String indexName = edgeLabel + propertyKey.getName();
if (!management.edgeIndexExist(edgeLabel, indexName)) {
management.createEdgeIndex(edgeLabel, indexName, edgeDirection, Collections.singletonList(propertyKey));
LOG.info("Created vertex-centric index for edge label: {} direction: {} for property: {} of type: {}",
edgeLabel, edgeDirection.name(), propertyName, propertyClass.getName());
}
}
private void createVertexCentricIndex(AtlasGraphManagement management, String edgeLabel, AtlasEdgeDirection edgeDirection, List<String> propertyNames) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating vertex-centric index for edge label: {} direction: {} for properties: {}",
edgeLabel, edgeDirection.name(), propertyNames);
}
String indexName = edgeLabel;
List<AtlasPropertyKey> propertyKeys = new ArrayList<>();
for (String propertyName : propertyNames) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey != null) {
propertyKeys.add(propertyKey);
indexName = indexName + propertyKey.getName();
}
}
if (!management.edgeIndexExist(edgeLabel, indexName) && CollectionUtils.isNotEmpty(propertyKeys)) {
management.createEdgeIndex(edgeLabel, indexName, edgeDirection, propertyKeys);
LOG.info("Created vertex-centric index for edge label: {} direction: {} for properties: {}", edgeLabel, edgeDirection.name(), propertyNames);
}
}
private void createEdgeIndex(AtlasGraphManagement management, String propertyName, Class propertyClass,
AtlasCardinality cardinality, boolean createCompositeIndex) {
if (propertyName != null) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
if (isIndexApplicable(propertyClass, cardinality)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating backing index for edge property {} of type {} ", propertyName, propertyClass.getName());
}
management.addMixedIndex(EDGE_INDEX, propertyKey, false);
LOG.info("Created backing index for edge property {} of type {} ", propertyName, propertyClass.getName());
}
}
if (propertyKey != null) {
if (createCompositeIndex) {
createEdgeCompositeIndex(management, propertyClass, propertyKey);
}
} else {
LOG.warn("Index not created for {}: propertyKey is null", propertyName);
}
}
}
private AtlasPropertyKey createFullTextIndex(AtlasGraphManagement management, String propertyName, Class propertyClass,
AtlasCardinality cardinality) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
if (propertyKey == null) {
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
if (isIndexApplicable(propertyClass, cardinality)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating backing index for vertex property {} of type {} ", propertyName, propertyClass.getName());
}
management.addMixedIndex(FULLTEXT_INDEX, propertyKey, false);
LOG.info("Created backing index for vertex property {} of type {} ", propertyName, propertyClass.getName());
}
LOG.info("Created index {}", FULLTEXT_INDEX);
}
return propertyKey;
}
private void createVertexCompositeIndex(AtlasGraphManagement management, Class propertyClass, AtlasPropertyKey propertyKey,
boolean enforceUniqueness) {
String propertyName = propertyKey.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating composite index for property {} of type {}; isUnique={} ", propertyName, propertyClass.getName(), enforceUniqueness);
}
AtlasGraphIndex existingIndex = management.getGraphIndex(propertyName);
if (existingIndex == null) {
management.createVertexCompositeIndex(propertyName, enforceUniqueness, Collections.singletonList(propertyKey));
LOG.info("Created composite index for property {} of type {}; isUnique={} ", propertyName, propertyClass.getName(), enforceUniqueness);
}
}
private void createEdgeCompositeIndex(AtlasGraphManagement management, Class propertyClass, AtlasPropertyKey propertyKey) {
String propertyName = propertyKey.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating composite index for property {} of type {}", propertyName, propertyClass.getName());
}
AtlasGraphIndex existingIndex = management.getGraphIndex(propertyName);
if (existingIndex == null) {
management.createEdgeCompositeIndex(propertyName, false, Collections.singletonList(propertyKey));
LOG.info("Created composite index for property {} of type {}", propertyName, propertyClass.getName());
}
}
private void createVertexCompositeIndexWithTypeName(AtlasGraphManagement management, Class propertyClass, AtlasPropertyKey propertyKey, boolean isUnique) {
createVertexCompositeIndexWithSystemProperty(management, propertyClass, propertyKey, ENTITY_TYPE_PROPERTY_KEY, SINGLE, isUnique);
}
private void createVertexCompositeIndexWithSuperTypeName(AtlasGraphManagement management, Class propertyClass, AtlasPropertyKey propertyKey) {
createVertexCompositeIndexWithSystemProperty(management, propertyClass, propertyKey, SUPER_TYPES_PROPERTY_KEY, SET, false);
}
private void createVertexCompositeIndexWithSystemProperty(AtlasGraphManagement management, Class propertyClass, AtlasPropertyKey propertyKey,
final String systemPropertyKey, AtlasCardinality cardinality, boolean isUnique) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating composite index for property {} of type {} and {}", propertyKey.getName(), propertyClass.getName(), systemPropertyKey);
}
AtlasPropertyKey typePropertyKey = management.getPropertyKey(systemPropertyKey);
if (typePropertyKey == null) {
typePropertyKey = management.makePropertyKey(systemPropertyKey, String.class, cardinality);
}
final String indexName = propertyKey.getName() + systemPropertyKey;
AtlasGraphIndex existingIndex = management.getGraphIndex(indexName);
if (existingIndex == null) {
List<AtlasPropertyKey> keys = new ArrayList<>(2);
keys.add(typePropertyKey);
keys.add(propertyKey);
management.createVertexCompositeIndex(indexName, isUnique, keys);
LOG.info("Created composite index for property {} of type {} and {}", propertyKey.getName(), propertyClass.getName(), systemPropertyKey);
}
}
private boolean isIndexApplicable(Class propertyClass, AtlasCardinality cardinality) {
return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) || cardinality.isMany());
}
public void commit(AtlasGraphManagement management) throws IndexException {
try {
management.commit();
recomputeIndexedKeys = true;
} catch (Exception e) {
LOG.error("Index commit failed", e);
throw new IndexException("Index commit failed ", e);
}
}
public void rollback(AtlasGraphManagement management) throws IndexException {
try {
management.rollback();
recomputeIndexedKeys = true;
} catch (Exception e) {
LOG.error("Index rollback failed ", e);
throw new IndexException("Index rollback failed ", e);
}
}
private void attemptRollback(ChangedTypeDefs changedTypeDefs, AtlasGraphManagement management)
throws AtlasBaseException {
if (null != management) {
try {
rollback(management);
} catch (IndexException e) {
LOG.error("Index rollback has failed", e);
throw new AtlasBaseException(AtlasErrorCode.INDEX_ROLLBACK_FAILED, e,
changedTypeDefs.toString());
}
}
}
private void updateIndexForTypeDef(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) {
Preconditions.checkNotNull(typeDef, "Cannot index on null typedefs");
if (LOG.isDebugEnabled()) {
LOG.debug("Creating indexes for type name={}, definition={}", typeDef.getName(), typeDef.getClass());
}
addIndexForType(management, typeDef);
LOG.info("Index creation for type {} complete", typeDef.getName());
}
private void notifyChangeListeners(ChangedTypeDefs changedTypeDefs) {
for (IndexChangeListener indexChangeListener : indexChangeListeners) {
try {
indexChangeListener.onChange(changedTypeDefs);
} catch (Throwable t) {
LOG.error("Error encountered in notifying the index change listener {}.", indexChangeListener.getClass().getName(), t);
//we need to throw exception if any of the listeners throw execption.
throw new RuntimeException("Error encountered in notifying the index change listener " + indexChangeListener.getClass().getName(), t);
}
}
}
private void notifyInitializationStart() {
for (IndexChangeListener indexChangeListener : indexChangeListeners) {
try {
indexChangeListener.onInitStart();
} catch (Throwable t) {
LOG.error("Error encountered in notifying the index change listener {}.", indexChangeListener.getClass().getName(), t);
//we need to throw exception if any of the listeners throw execption.
throw new RuntimeException("Error encountered in notifying the index change listener " + indexChangeListener.getClass().getName(), t);
}
}
}
private void notifyInitializationCompletion(ChangedTypeDefs changedTypeDefs) {
for (IndexChangeListener indexChangeListener : indexChangeListeners) {
try {
indexChangeListener.onInitCompletion(changedTypeDefs);
} catch (Throwable t) {
LOG.error("Error encountered in notifying the index change listener {}.", indexChangeListener.getClass().getName(), t);
//we need to throw exception if any of the listeners throw execption.
throw new RuntimeException("Error encountered in notifying the index change listener " + indexChangeListener.getClass().getName(), t);
}
}
}
private void createEdgeLabels(AtlasGraphManagement management, List<? extends AtlasBaseTypeDef> typeDefs) {
if (CollectionUtils.isEmpty(typeDefs)) {
return;
}
for (AtlasBaseTypeDef typeDef : typeDefs) {
if (typeDef instanceof AtlasEntityDef) {
AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
createEdgeLabelsForStruct(management, entityDef);
} else if (typeDef instanceof AtlasRelationshipDef) {
createEdgeLabels(management, (AtlasRelationshipDef) typeDef);
}
}
}
private void createEdgeLabelsForStruct(AtlasGraphManagement management, AtlasEntityDef entityDef) {
try {
AtlasType type = typeRegistry.getType(entityDef.getName());
if (!(type instanceof AtlasEntityType)) {
return;
}
AtlasEntityType entityType = (AtlasEntityType) type;
for (AtlasAttributeDef attributeDef : entityDef.getAttributeDefs()) {
AtlasAttribute attribute = entityType.getAttribute(attributeDef.getName());
if (attribute.getAttributeType().getTypeCategory() == TypeCategory.STRUCT) {
String relationshipLabel = attribute.getRelationshipEdgeLabel();
createEdgeLabelUsingLabelName(management, relationshipLabel);
}
}
} catch (AtlasBaseException e) {
LOG.error("Error fetching type: {}", entityDef.getName(), e);
}
}
private void createEdgeLabels(AtlasGraphManagement management, AtlasRelationshipDef relationshipDef) {
String relationshipTypeName = relationshipDef.getName();
AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
String relationshipLabel = relationshipType.getRelationshipLabel();
createEdgeLabelUsingLabelName(management, relationshipLabel);
}
}