ATLAS-4181: Provide option to add mandatory attribute to existing entity definition

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 61abfca..771287f 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -203,6 +203,11 @@
     public static final Integer INCOMPLETE_ENTITY_VALUE   = Integer.valueOf(1);
 
     /*
+     * typedef patch constants
+     */
+    public static final String  TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE   = "ADD_MANDATORY_ATTRIBUTE";
+
+    /*
      * All supported file-format extensions for Bulk Imports through file upload
      */
     public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
index 06c7221..c2acc5b 100644
--- a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
+++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
@@ -200,7 +200,7 @@
 
         processSearchParameters(fileName, underTest);
 
-        Assert.assertEquals(underTest.build(), "+t10  AND  -__state_index:DELETED AND  +__typeName__index:(hive_table )  AND  ( ( +created__index:[ * TO100}  ) )");
+        Assert.assertEquals(underTest.build(), "+t10  AND  -__state_index:DELETED AND  +__typeName__index:(hive_table )  AND  ( ( +created__index:[ * TO 100}  ) )");
     }
 
     @Test
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
new file mode 100644
index 0000000..3102516
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+
+public class AddMandatoryAttributesPatch extends AtlasPatchHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AddMandatoryAttributesPatch.class);
+
+    private static final String PATCH_ID          = "JAVA_PATCH_0000_008";
+    private static final String PATCH_DESCRIPTION = "Add mandatory attributes for all existing entities for given typeName";
+
+    private final PatchContext            context;
+    private final String                  typeName;
+    private final List<AtlasAttributeDef> attributesToAdd;
+
+    public AddMandatoryAttributesPatch(PatchContext context, String typedefPatchId, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+        super(context.getPatchRegistry(), PATCH_ID + "_" + typedefPatchId, PATCH_DESCRIPTION);
+
+        this.context         = context;
+        this.typeName        = typeName;
+        this.attributesToAdd = attributesToAdd;
+    }
+
+    @Override
+    public void apply() throws AtlasBaseException {
+        LOG.info("==> MandatoryAttributePatch.apply(): patchId={}", getPatchId());
+
+        ConcurrentPatchProcessor patchProcessor = new AddMandatoryAttributesPatchProcessor(context, typeName, attributesToAdd);
+
+        patchProcessor.apply();
+
+        setStatus(APPLIED);
+
+        LOG.info("<== MandatoryAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
+    }
+
+    public static class AddMandatoryAttributesPatchProcessor extends ConcurrentPatchProcessor {
+        private final String                  typeName;
+        private final Set<String>             typeAndAllSubTypes;
+        private final List<AtlasAttributeDef> attributesToAdd;
+
+        public AddMandatoryAttributesPatchProcessor(PatchContext context, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+            super(context);
+
+            AtlasEntityType entityType = getTypeRegistry().getEntityTypeByName(typeName);
+
+            this.typeName        = typeName;
+            this.attributesToAdd = attributesToAdd;
+
+            if (entityType != null) {
+                this.typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
+            } else {
+                LOG.warn("AddMandatoryAttributesPatchProcessor(): failed to find entity-type {}", typeName);
+
+                this.typeAndAllSubTypes = Collections.emptySet();
+            }
+        }
+
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
+                LOG.info("Entity types to be updated with mandatory attributes: {}", typeAndAllSubTypes.size());
+
+                for (String typeName : typeAndAllSubTypes) {
+                    LOG.info("finding entities of type {}", typeName);
+
+                    AtlasGraph       graph     = getGraph();
+                    Iterable<Object> vertexIds = graph.query().has(ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+                    int              count     = 0;
+
+                    for (Iterator<Object> iterator = vertexIds.iterator(); iterator.hasNext(); ) {
+                        Object vertexId = iterator.next();
+
+                        manager.checkProduce(vertexId);
+
+                        count++;
+                    }
+
+                    LOG.info("found {} entities of type {}", count, typeName);
+                }
+            }
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("==> AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
+            }
+
+            for (AtlasAttributeDef attributeDef : attributesToAdd) {
+                AtlasAttribute attribute = entityType.getAttribute(attributeDef.getName());
+
+                if (attribute != null) {
+                    Object existingValue = vertex.getProperty(attribute.getVertexPropertyName(), Object.class);
+
+                    if (existingValue == null) {
+                        vertex.setProperty(attribute.getVertexPropertyName(), attributeDef.getDefaultValue());
+                    }
+                }
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
+            }
+        }
+
+        @Override
+        protected void prepareForExecution() {
+            //do nothing
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 676a0aa..89e9422 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -28,6 +28,7 @@
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
@@ -41,8 +42,10 @@
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AddMandatoryAttributesPatch;
 import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.patches.AtlasPatchRegistry;
@@ -454,7 +457,8 @@
                     new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry),
                     new SetServiceTypePatchHandler(typeDefStore, typeRegistry),
                     new UpdateAttributeMetadataHandler(typeDefStore, typeRegistry),
-                    new AddSuperTypePatchHandler(typeDefStore, typeRegistry)
+                    new AddSuperTypePatchHandler(typeDefStore, typeRegistry),
+                    new AddMandatoryAttributePatchHandler(typeDefStore, typeRegistry)
             };
 
             Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@@ -787,6 +791,113 @@
         }
     }
 
+    class AddMandatoryAttributePatchHandler extends PatchHandler {
+        public AddMandatoryAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
+            super(typeDefStore, typeRegistry, new String[] { Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE });
+        }
+
+        @Override
+        public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
+            String           typeName  = patch.getTypeName();
+            AtlasBaseTypeDef typeDef   = typeRegistry.getTypeDefByName(typeName);
+            PatchStatus      ret;
+
+            if (typeDef == null) {
+                throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
+            }
+
+            if (isPatchApplicable(patch, typeDef)) {
+                List<AtlasAttributeDef> attributesToAdd = getAttributesToAdd(patch, (AtlasStructDef) typeDef);
+
+                if (CollectionUtils.isEmpty(attributesToAdd)) {
+                    LOG.info("patch skipped: typeName={}; mandatory attributes are not valid in patch {}",patch.getTypeName(), patch.getId());
+
+                    ret = SKIPPED;
+                } else {
+                    try {
+                        RequestContext.get().setInTypePatching(true);
+
+                        RequestContext.get().setCurrentTypePatchAction(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE);
+
+                        if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+                            AtlasEntityDef updatedDef = new AtlasEntityDef((AtlasEntityDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+                            typeDefStore.updateEntityDefByName(typeName, updatedDef);
+                        } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
+                            AtlasClassificationDef updatedDef = new AtlasClassificationDef((AtlasClassificationDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+                            typeDefStore.updateClassificationDefByName(typeName, updatedDef);
+                        } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
+                            AtlasStructDef updatedDef = new AtlasStructDef((AtlasStructDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
+
+                            typeDefStore.updateStructDefByName(typeName, updatedDef);
+                        } else {
+                            throw new AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, patch.getAction(), typeDef.getClass().getSimpleName());
+                        }
+
+                        LOG.info("adding a Java patch to update entities of {} with new mandatory attributes", typeName);
+
+                        // Java patch handler to add mandatory attributes
+                        patchManager.addPatchHandler(new AddMandatoryAttributesPatch(patchManager.getContext(), patch.getId(), typeName, attributesToAdd));
+
+                        ret = APPLIED;
+                    } finally {
+                        RequestContext.get().setInTypePatching(false);
+
+                        RequestContext.clear();
+                    }
+                }
+            } else {
+                LOG.info("patch skipped: typeName={}; applyToVersion={}; updateToVersion={}",
+                        patch.getTypeName(), patch.getApplyToVersion(), patch.getUpdateToVersion());
+
+                ret = SKIPPED;
+
+            }
+
+            return ret;
+        }
+
+        // Validate mandatory attribute with non-empty default value if PRIMITIVE, not unique and doesn't exists
+        private List<AtlasAttributeDef> getAttributesToAdd(TypeDefPatch patch, AtlasStructDef updatedDef) throws AtlasBaseException {
+            List<AtlasAttributeDef> ret = new ArrayList<>();
+
+            for (AtlasAttributeDef attributeDef : patch.getAttributeDefs()) {
+                TypeCategory attributeType = typeRegistry.getType(attributeDef.getTypeName()).getTypeCategory();
+
+                if (updatedDef.hasAttribute(attributeDef.getName())) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): already exists in type {}. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), updatedDef.getName());
+                } else if (attributeDef.getIsOptional()) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not mandatory attribute. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+                } else if (StringUtils.isEmpty(attributeDef.getDefaultValue())) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): default value is missing. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+                } else if (!TypeCategory.PRIMITIVE.equals(attributeType)) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): type {} is not primitive. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), attributeDef.getTypeName());
+                } else if (attributeDef.getIsUnique()) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not unique. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
+                } else {
+                    ret.add(attributeDef);
+                }
+            }
+
+            return ret;
+        }
+
+        private void updateTypeDefWithPatch(TypeDefPatch patch, AtlasStructDef updatedDef, List<AtlasAttributeDef> attributesToAdd) {
+            for (AtlasAttributeDef attributeDef : attributesToAdd) {
+                updatedDef.addAttribute(attributeDef);
+            }
+
+            updatedDef.setTypeVersion(patch.getUpdateToVersion());
+        }
+    }
+
     class UpdateAttributePatchHandler extends PatchHandler {
         public UpdateAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
             super(typeDefStore, typeRegistry, new String[] { "UPDATE_ATTRIBUTE" });
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 0c13a78..27dae16 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -442,8 +442,8 @@
                         continue;
                     }
 
-                    // new attribute - only allow if optional
-                    if (!attributeDef.getIsOptional()) {
+                    // new attribute - allow optional by default or allow mandatory only with typedef patch ADD_MANDATORY_ATTRIBUTE
+                    if (!attributeDef.getIsOptional() && !isInAddMandatoryAttributePatch()) {
                         throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName());
                     }
                 }
@@ -470,6 +470,11 @@
         AtlasGraphUtilsV2.setEncodedProperty(vertex, encodedStructDefPropertyKey, attrNames);
     }
 
+    public static boolean isInAddMandatoryAttributePatch() {
+        return RequestContext.get().isInTypePatching() &&
+                StringUtils.equals(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE, RequestContext.get().getCurrentTypePatchAction());
+    }
+
     public static void updateVertexAddReferences(AtlasStructDef structDef, AtlasVertex vertex,
                                                  AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
         for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 7de3536..37d23c2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -75,6 +75,7 @@
     private boolean     isInTypePatching           = false;
     private boolean     createShellEntityForNonExistingReference = false;
     private boolean     skipFailedEntities = false;
+    private String      currentTypePatchAction = "";
 
     private RequestContext() {
     }
@@ -237,6 +238,14 @@
         this.skipFailedEntities = skipFailedEntities;
     }
 
+    public String getCurrentTypePatchAction() {
+        return currentTypePatchAction;
+    }
+
+    public void setCurrentTypePatchAction(String currentTypePatchAction) {
+        this.currentTypePatchAction = currentTypePatchAction;
+    }
+
     public void recordEntityUpdate(AtlasEntityHeader entity) {
         if (entity != null && entity.getGuid() != null && ! entitiesToSkipUpdate.contains(entity.getGuid())) {
             updatedEntities.put(entity.getGuid(), entity);