ATLAS-4181: Provide option to add mandatory attribute to existing entity definition
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 61abfca..771287f 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -203,6 +203,11 @@
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
/*
+ * typedef patch constants
+ */
+ public static final String TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE = "ADD_MANDATORY_ATTRIBUTE";
+
+ /*
* All supported file-format extensions for Bulk Imports through file upload
*/
public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
index 06c7221..c2acc5b 100644
--- a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
+++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
@@ -200,7 +200,7 @@
processSearchParameters(fileName, underTest);
- Assert.assertEquals(underTest.build(), "+t10 AND -__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( ( +created__index:[ * TO100} ) )");
+ Assert.assertEquals(underTest.build(), "+t10 AND -__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( ( +created__index:[ * TO 100} ) )");
}
@Test
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
new file mode 100644
index 0000000..3102516
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+
+public class AddMandatoryAttributesPatch extends AtlasPatchHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AddMandatoryAttributesPatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_008";
+ private static final String PATCH_DESCRIPTION = "Add mandatory attributes for all existing entities for given typeName";
+
+ private final PatchContext context;
+ private final String typeName;
+ private final List<AtlasAttributeDef> attributesToAdd;
+
+ public AddMandatoryAttributesPatch(PatchContext context, String typedefPatchId, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+ super(context.getPatchRegistry(), PATCH_ID + "_" + typedefPatchId, PATCH_DESCRIPTION);
+
+ this.context = context;
+ this.typeName = typeName;
+ this.attributesToAdd = attributesToAdd;
+ }
+
+ @Override
+ public void apply() throws AtlasBaseException {
+ LOG.info("==> MandatoryAttributePatch.apply(): patchId={}", getPatchId());
+
+ ConcurrentPatchProcessor patchProcessor = new AddMandatoryAttributesPatchProcessor(context, typeName, attributesToAdd);
+
+ patchProcessor.apply();
+
+ setStatus(APPLIED);
+
+ LOG.info("<== MandatoryAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
+ }
+
+ public static class AddMandatoryAttributesPatchProcessor extends ConcurrentPatchProcessor {
+ private final String typeName;
+ private final Set<String> typeAndAllSubTypes;
+ private final List<AtlasAttributeDef> attributesToAdd;
+
+ public AddMandatoryAttributesPatchProcessor(PatchContext context, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+ super(context);
+
+ AtlasEntityType entityType = getTypeRegistry().getEntityTypeByName(typeName);
+
+ this.typeName = typeName;
+ this.attributesToAdd = attributesToAdd;
+
+ if (entityType != null) {
+ this.typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
+ } else {
+ LOG.warn("AddMandatoryAttributesPatchProcessor(): failed to find entity-type {}", typeName);
+
+ this.typeAndAllSubTypes = Collections.emptySet();
+ }
+ }
+
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
+ LOG.info("Entity types to be updated with mandatory attributes: {}", typeAndAllSubTypes.size());
+
+ for (String typeName : typeAndAllSubTypes) {
+ LOG.info("finding entities of type {}", typeName);
+
+ AtlasGraph graph = getGraph();
+ Iterable<Object> vertexIds = graph.query().has(ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+ int count = 0;
+
+ for (Iterator<Object> iterator = vertexIds.iterator(); iterator.hasNext(); ) {
+ Object vertexId = iterator.next();
+
+ manager.checkProduce(vertexId);
+
+ count++;
+ }
+
+ LOG.info("found {} entities of type {}", count, typeName);
+ }
+ }
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
+ }
+
+ for (AtlasAttributeDef attributeDef : attributesToAdd) {
+ AtlasAttribute attribute = entityType.getAttribute(attributeDef.getName());
+
+ if (attribute != null) {
+ Object existingValue = vertex.getProperty(attribute.getVertexPropertyName(), Object.class);
+
+ if (existingValue == null) {
+ vertex.setProperty(attribute.getVertexPropertyName(), attributeDef.getDefaultValue());
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
+ }
+ }
+
+ @Override
+ protected void prepareForExecution() {
+ //do nothing
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 676a0aa..89e9422 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -28,6 +28,7 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
@@ -41,8 +42,10 @@
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AddMandatoryAttributesPatch;
import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.patches.AtlasPatchRegistry;
@@ -454,7 +457,8 @@
new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry),
new SetServiceTypePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributeMetadataHandler(typeDefStore, typeRegistry),
- new AddSuperTypePatchHandler(typeDefStore, typeRegistry)
+ new AddSuperTypePatchHandler(typeDefStore, typeRegistry),
+ new AddMandatoryAttributePatchHandler(typeDefStore, typeRegistry)
};
Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@@ -787,6 +791,113 @@
}
}
+ class AddMandatoryAttributePatchHandler extends PatchHandler {
+ public AddMandatoryAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
+ super(typeDefStore, typeRegistry, new String[] { Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE });
+ }
+
+ @Override
+ public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
+ String typeName = patch.getTypeName();
+ AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
+ PatchStatus ret;
+
+ if (typeDef == null) {
+ throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
+ }
+
+ if (isPatchApplicable(patch, typeDef)) {
+ List<AtlasAttributeDef> attributesToAdd = getAttributesToAdd(patch, (AtlasStructDef) typeDef);
+
+ if (CollectionUtils.isEmpty(attributesToAdd)) {
+ LOG.info("patch skipped: typeName={}; mandatory attributes are not valid in patch {}",patch.getTypeName(), patch.getId());
+
+ ret = SKIPPED;
+ } else {
+ try {
+ RequestContext.get().setInTypePatching(true);
+
+ RequestContext.get().setCurrentTypePatchAction(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE);
+
+ if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+ AtlasEntityDef updatedDef = new AtlasEntityDef((AtlasEntityDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+ typeDefStore.updateEntityDefByName(typeName, updatedDef);
+ } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
+ AtlasClassificationDef updatedDef = new AtlasClassificationDef((AtlasClassificationDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+ typeDefStore.updateClassificationDefByName(typeName, updatedDef);
+ } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
+ AtlasStructDef updatedDef = new AtlasStructDef((AtlasStructDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+ typeDefStore.updateStructDefByName(typeName, updatedDef);
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, patch.getAction(), typeDef.getClass().getSimpleName());
+ }
+
+ LOG.info("adding a Java patch to update entities of {} with new mandatory attributes", typeName);
+
+ // Java patch handler to add mandatory attributes
+ patchManager.addPatchHandler(new AddMandatoryAttributesPatch(patchManager.getContext(), patch.getId(), typeName, attributesToAdd));
+
+ ret = APPLIED;
+ } finally {
+ RequestContext.get().setInTypePatching(false);
+
+ RequestContext.clear();
+ }
+ }
+ } else {
+ LOG.info("patch skipped: typeName={}; applyToVersion={}; updateToVersion={}",
+ patch.getTypeName(), patch.getApplyToVersion(), patch.getUpdateToVersion());
+
+ ret = SKIPPED;
+
+ }
+
+ return ret;
+ }
+
+ // Validate mandatory attribute with non-empty default value if PRIMITIVE, not unique and doesn't exists
+ private List<AtlasAttributeDef> getAttributesToAdd(TypeDefPatch patch, AtlasStructDef updatedDef) throws AtlasBaseException {
+ List<AtlasAttributeDef> ret = new ArrayList<>();
+
+ for (AtlasAttributeDef attributeDef : patch.getAttributeDefs()) {
+ TypeCategory attributeType = typeRegistry.getType(attributeDef.getTypeName()).getTypeCategory();
+
+ if (updatedDef.hasAttribute(attributeDef.getName())) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): already exists in type {}. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), updatedDef.getName());
+ } else if (attributeDef.getIsOptional()) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not mandatory attribute. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+ } else if (StringUtils.isEmpty(attributeDef.getDefaultValue())) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): default value is missing. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+ } else if (!TypeCategory.PRIMITIVE.equals(attributeType)) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): type {} is not primitive. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), attributeDef.getTypeName());
+ } else if (attributeDef.getIsUnique()) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not unique. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+ } else {
+ ret.add(attributeDef);
+ }
+ }
+
+ return ret;
+ }
+
+ private void updateTypeDefWithPatch(TypeDefPatch patch, AtlasStructDef updatedDef, List<AtlasAttributeDef> attributesToAdd) {
+ for (AtlasAttributeDef attributeDef : attributesToAdd) {
+ updatedDef.addAttribute(attributeDef);
+ }
+
+ updatedDef.setTypeVersion(patch.getUpdateToVersion());
+ }
+ }
+
class UpdateAttributePatchHandler extends PatchHandler {
public UpdateAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { "UPDATE_ATTRIBUTE" });
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 0c13a78..27dae16 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -442,8 +442,8 @@
continue;
}
- // new attribute - only allow if optional
- if (!attributeDef.getIsOptional()) {
+ // new attribute - allow optional by default or allow mandatory only with typedef patch ADD_MANDATORY_ATTRIBUTE
+ if (!attributeDef.getIsOptional() && !isInAddMandatoryAttributePatch()) {
throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName());
}
}
@@ -470,6 +470,11 @@
AtlasGraphUtilsV2.setEncodedProperty(vertex, encodedStructDefPropertyKey, attrNames);
}
+ public static boolean isInAddMandatoryAttributePatch() {
+ return RequestContext.get().isInTypePatching() &&
+ StringUtils.equals(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE, RequestContext.get().getCurrentTypePatchAction());
+ }
+
public static void updateVertexAddReferences(AtlasStructDef structDef, AtlasVertex vertex,
AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 7de3536..37d23c2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -75,6 +75,7 @@
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
private boolean skipFailedEntities = false;
+ private String currentTypePatchAction = "";
private RequestContext() {
}
@@ -237,6 +238,14 @@
this.skipFailedEntities = skipFailedEntities;
}
+ public String getCurrentTypePatchAction() {
+ return currentTypePatchAction;
+ }
+
+ public void setCurrentTypePatchAction(String currentTypePatchAction) {
+ this.currentTypePatchAction = currentTypePatchAction;
+ }
+
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null && ! entitiesToSkipUpdate.contains(entity.getGuid())) {
updatedEntities.put(entity.getGuid(), entity);