blob: c30bd25968ae6356cb69e3e67c619d70a5d19633 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
public class UniqueAttributePatch extends AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatch.class);
private static final String PATCH_ID = "JAVA_PATCH_0000_001";
private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
private final PatchContext context;
public UniqueAttributePatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
this.context = context;
}
@Override
public void apply() throws AtlasBaseException {
ConcurrentPatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
patchProcessor.apply();
setStatus(APPLIED);
LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
public UniqueAttributePatchProcessor(PatchContext context) {
super(context);
}
@Override
protected void prepareForExecution() {
//create the new attribute for all unique attributes.
createIndexForUniqueAttributes();
}
@Override
public void submitVerticesToUpdate(WorkItemManager manager) {
AtlasTypeRegistry typeRegistry = getTypeRegistry();
AtlasGraph graph = getGraph();
for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
LOG.info("finding entities of type {}", entityType.getTypeName());
Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
int count = 0;
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
Object vertexId = iter.next();
manager.checkProduce((Long) vertexId);
count++;
}
LOG.info("found {} entities of type {}", count, entityType.getTypeName());
}
}
@Override
protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
//process the vertex
processItem(vertexId, vertex, typeName, entityType);
}
private void createIndexForUniqueAttributes() {
for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
String typeName = entityType.getTypeName();
Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
if (CollectionUtils.isEmpty(uniqAttributes)) {
LOG.info("UniqueAttributePatchProcessor.apply(): no unique attribute for entity-type {}", typeName);
continue;
}
createIndexForUniqueAttributes(typeName, uniqAttributes);
}
}
private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
try {
AtlasGraphManagement management = getGraph().getManagementSystem();
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName = attribute.getVertexUniquePropertyName();
if (management.getPropertyKey(uniquePropertyName) != null) {
continue;
}
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
Class propertyClass = getIndexer().getPrimitiveClass(attribTypeName);
AtlasCardinality cardinality = getIndexer().toAtlasCardinality(attributeDef.getCardinality());
getIndexer().createVertexIndex(management,
uniquePropertyName,
UniqueKind.PER_TYPE_UNIQUE,
propertyClass,
cardinality,
isIndexable,
true,
AtlasAttributeDef.IndexType.STRING.equals(attribute.getIndexType()));
}
getIndexer().commit(management);
getGraph().commit();
LOG.info("Unique attributes: type: {}: Registered!", typeName);
} catch (IndexException e) {
LOG.error("Error creating index: type: {}", typeName, e);
}
}
protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
processIndexStringAttribute(vertexId, vertex, typeName, entityType);
if (AtlasGraphUtilsV2.getState(vertex) == AtlasEntity.Status.ACTIVE) {
processUniqueAttribute(vertexId, vertex, typeName, entityType);
}
LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
}
private void processIndexStringAttribute(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
if (attribute.getAttributeDef().getIndexType() != null &&
attribute.getAttributeDef().getIndexType() == AtlasAttributeDef.IndexType.STRING) {
String vertexPropertyName = attribute.getVertexPropertyName();
if (vertex.getProperty(vertexPropertyName, String.class) != null) {
continue;
}
Object attrVal = AtlasGraphUtilsV2.getEncodedProperty(vertex, attribute.getQualifiedName(), String.class);
if (attrVal != null) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, vertexPropertyName, attrVal);
}
}
}
LOG.debug("processIndexStringAttribute(typeName={}, vertexId={}): Done!", typeName, vertexId);
}
private void processUniqueAttribute(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
Object uniqAttrValue = null;
if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
try {
String propertyKey = attribute.getVertexPropertyName();
uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
} catch (AtlasSchemaViolationException ex) {
LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
vertex.removeProperty(uniquePropertyKey);
}
}
}
LOG.debug("processUniqueAttribute(typeName={}, vertexId={}): Done!", typeName, vertexId);
}
}
}