ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2 directory objectPrefix Issue with Atlas Server and Hook versions mismatch

Signed-off-by: sidmishra <sidmishra@apache.org>
diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
index a9f2e50..01a67b7 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -231,7 +231,7 @@
                 ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
 
                 ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
-                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
+                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, subDirName);
 
diff --git a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
index 6bf5d57..f35e9ae 100644
--- a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
+++ b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -372,16 +372,16 @@
 
         if (pathQName.equalsIgnoreCase(entityQName)){
             assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "Irradiance_A.csv");
-            assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
+            assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/Irradiance_A.csv/");
         } else {
             pathQName = s3Scheme + "aws_my_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE;
             if (pathQName.equalsIgnoreCase(entityQName)){
                 assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890");
-                assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+                assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
             } else {
                 assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE);
                 assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders");
-                assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
+                assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
             }
         }
     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 5643af9..49c504f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -154,6 +154,7 @@
     public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
     public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
+    public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX       = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
     public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER                         = "atlas.notification.authorize.using.message.user";
     public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS                    = "atlas.notification.authorize.authn.cache.ttl.seconds";
 
@@ -182,6 +183,7 @@
     private final Map<String, PreprocessAction> hiveTablesCache;
     private final boolean                       hiveTypesRemoveOwnedRefAttrs;
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
+    private final boolean                       s3V2DirectoryPruneObjectPrefix;
     private final boolean                       preprocessEnabled;
     private final boolean createShellEntityForNonExistingReference;
     private final boolean                       authorizeUsingMessageUser;
@@ -310,12 +312,15 @@
 
         hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
         rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
-        preprocessEnabled             = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
+        s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true);
+
+        preprocessEnabled             = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
         entityCorrelationManager      = new EntityCorrelationManager(entityCorrelationStore);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
         LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
         LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
+        LOG.info("{}={}", CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, s3V2DirectoryPruneObjectPrefix);
         LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
         LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
     }
@@ -982,7 +987,7 @@
         if (preprocessEnabled) {
             context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
                     hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
-                    rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
+                    rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
 
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
@@ -996,6 +1001,10 @@
                 rdbmsTypeRemoveOwnedRefAttrs(context);
             }
 
+            if (s3V2DirectoryPruneObjectPrefix) {
+                pruneObjectPrefixForS3V2Directory(context);
+            }
+
             context.moveRegisteredReferredEntities();
 
             if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
@@ -1040,6 +1049,28 @@
         }
     }
 
+    private void pruneObjectPrefixForS3V2Directory(PreprocessorContext context) {
+        List<AtlasEntity> entities = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(context.getEntities())) {
+            entities.addAll(context.getEntities());
+        }
+
+        if (MapUtils.isNotEmpty(context.getReferredEntities())) {
+            entities.addAll(context.getReferredEntities().values());
+        }
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            for (AtlasEntity entity : entities) {
+                EntityPreprocessor preprocessor = EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName());
+
+                if (preprocessor != null) {
+                    preprocessor.preprocess(entity, context);
+                }
+            }
+        }
+    }
+
     private void preprocessHiveTypes(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
new file mode 100644
index 0000000..6102572
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AWSS3V2Preprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2Preprocessor.class);
+
+    private static final String AWS_S3_V2_DIR_TYPE       = "aws_s3_v2_directory";
+    private static final String ATTRIBUTE_OBJECT_PREFIX  = "objectPrefix";
+    private static final String SCHEME_SEPARATOR             = "://";
+
+    static class AWSS3V2DirectoryPreprocessor extends EntityPreprocessor {
+        protected AWSS3V2DirectoryPreprocessor() {
+            super(AWS_S3_V2_DIR_TYPE);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (context.getS3V2DirectoryPruneObjectPrefix()) {
+                String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+                String objectPrefix = (String) entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX);
+
+                if (isObjectPrefixPruneNeeded(qualifiedName, objectPrefix)) {
+                    if (objectPrefix.lastIndexOf(Path.SEPARATOR) == -1) {
+                        objectPrefix = Path.SEPARATOR;
+                    } else {
+                        if (doesEndsWithPathSeparator(objectPrefix)) {
+                            objectPrefix = removeLastPathSeparator(objectPrefix);
+                        }
+
+                        objectPrefix = objectPrefix.substring(0, objectPrefix.lastIndexOf(Path.SEPARATOR) + 1);
+                    }
+
+                    LOG.info("Aws S3 V2 Preprocessor: Pruning {} from {} to {}", ATTRIBUTE_OBJECT_PREFIX + QNAME_SEP_CLUSTER_NAME + AWS_S3_V2_DIR_TYPE,
+                                entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), objectPrefix);
+
+                    entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, objectPrefix);
+                }
+            }
+        }
+
+        private boolean isObjectPrefixPruneNeeded(String qualifiedName, String objectPrefix) {
+            return (StringUtils.isNotBlank(qualifiedName)
+                    && StringUtils.isNotBlank(objectPrefix)
+                    && qualifiedName.contains(getSchemeAndBucket(qualifiedName) + objectPrefix + QNAME_SEP_CLUSTER_NAME));
+        }
+
+        private String getSchemeAndBucket(String qualifiedName) {
+            String ret = "";
+
+            if (StringUtils.isNotEmpty(qualifiedName) && qualifiedName.contains(SCHEME_SEPARATOR)) {
+                int schemeSeparatorEndPosition = qualifiedName.indexOf(SCHEME_SEPARATOR) + SCHEME_SEPARATOR.length();
+                int bucketEndPosition = qualifiedName.indexOf(Path.SEPARATOR, schemeSeparatorEndPosition);
+                ret = qualifiedName.substring(0, bucketEndPosition);
+            }
+
+            return ret;
+        }
+
+        private boolean doesEndsWithPathSeparator(String path) {
+            return path.endsWith(Path.SEPARATOR);
+        }
+
+        private String removeLastPathSeparator(String path) {
+            return StringUtils.chop(path);
+        }
+    }
+
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7f0cafe..f8eac4c 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -64,8 +64,9 @@
     public static final char   QNAME_SEP_ENTITY_NAME  = '.';
     public static final String QNAME_SD_SUFFIX        = "_storage";
 
-    private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP  = new HashMap<>();
-    private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
+    private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP      = new HashMap<>();
+    private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP     = new HashMap<>();
+    private static final Map<String, EntityPreprocessor> AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>();
 
     private final String typeName;
 
@@ -88,6 +89,10 @@
                                                                     new RdbmsPreprocessor.RdbmsTablePreprocessor()
        };
 
+        EntityPreprocessor[] s3V2Preprocessors = new EntityPreprocessor[] {
+                new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor()
+        };
+
         for (EntityPreprocessor preprocessor : hivePreprocessors) {
             HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
         }
@@ -95,6 +100,10 @@
         for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
             RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
         }
+
+        for (EntityPreprocessor preprocessor : s3V2Preprocessors) {
+            AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+        }
     }
 
     protected EntityPreprocessor(String typeName) {
@@ -116,6 +125,10 @@
         return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
     }
 
+    public static EntityPreprocessor getS3V2Preprocessor(String typeName) {
+        return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : null;
+    }
+
     public static String getQualifiedName(AtlasEntity entity) {
         Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 59f6440..f930d9f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -63,6 +63,7 @@
     private final boolean                             updateHiveProcessNameWithQualifiedName;
     private final boolean                             hiveTypesRemoveOwnedRefAttrs;
     private final boolean                             rdbmsTypesRemoveOwnedRefAttrs;
+    private final boolean                             s3V2DirectoryPruneObjectPrefix;
     private final boolean                             isHivePreProcessEnabled;
     private final Set<String>                         ignoredEntities        = new HashSet<>();
     private final Set<String>                         prunedEntities         = new HashSet<>();
@@ -73,7 +74,7 @@
     private final EntityCorrelationManager            correlationManager;
     private       List<AtlasEntity>                   postUpdateEntities     = null;
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationManager correlationManager) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean s3V2DirectoryPruneObjectPrefix, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationManager correlationManager) {
         this.kafkaMessage                           = kafkaMessage;
         this.typeRegistry                           = typeRegistry;
         this.hiveTablesToIgnore                     = hiveTablesToIgnore;
@@ -84,6 +85,7 @@
         this.hiveTablePrefixesToIgnore              = hiveTablePrefixesToIgnore;
         this.hiveTypesRemoveOwnedRefAttrs           = hiveTypesRemoveOwnedRefAttrs;
         this.rdbmsTypesRemoveOwnedRefAttrs          = rdbmsTypesRemoveOwnedRefAttrs;
+        this.s3V2DirectoryPruneObjectPrefix         = s3V2DirectoryPruneObjectPrefix;
         this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName;
 
         final HookNotification  message = kafkaMessage.getMessage();
@@ -124,6 +126,10 @@
 
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
 
+    public boolean getS3V2DirectoryPruneObjectPrefix() {
+        return s3V2DirectoryPruneObjectPrefix;
+    }
+
     public boolean isHivePreprocessEnabled() {
         return isHivePreProcessEnabled;
     }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
new file mode 100644
index 0000000..9c6c92a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.PathExtractorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+public class AWSS3V2PreprocessorTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2PreprocessorTest.class);
+    private static final String METADATA_NAMESPACE          = "cm";
+    private static final String QNAME_METADATA_NAMESPACE    = '@' + METADATA_NAMESPACE;
+    private static final String AWS_S3_MODEL_VERSION_V2     = "V2";
+
+    private static final String SCHEME_SEPARATOR            = "://";
+    private static final String S3_SCHEME                   = "s3" + SCHEME_SEPARATOR;
+    private static final String S3A_SCHEME                  = "s3a" + SCHEME_SEPARATOR;
+    private static final String ABFS_SCHEME                 = "abfs" + SCHEME_SEPARATOR;
+
+    private static final String ATTRIBUTE_NAME              = "name";
+    private static final String ATTRIBUTE_OBJECT_PREFIX     = "objectPrefix";
+    private static final String ATTRIBUTE_QUALIFIED_NAME    = "qualifiedName";
+
+    private static final List<String>  EMPTY_STR_LIST       = new ArrayList<>();
+    private static final List<Pattern> EMPTY_PATTERN_LIST   = new ArrayList<>();
+
+    @Test
+    public void testS2V2DirectoryPreprocessorForOtherTypes() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE);
+        final String            ABFS_PATH           = ABFS_SCHEME + "data@razrangersan.dfs.core.windows.net/tmp/cdp-demo/sample.csv";
+        Path                    path                = new Path(ABFS_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), null);
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertNotEquals(entity.getTypeName(), preprocessor.getTypeName());
+    }
+
+    @Test
+    public void testS2V2DirectoryPreprocessorForFullPath() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+        final String            S3_PATH             = S3A_SCHEME + "aws_bucket1/1234567890/test/data1";
+        Path                    path                = new Path(S3_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/data1/");
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                                                                        EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                                            false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/");
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/1234567890/test/data1/" + QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data1");
+    }
+
+    @Test
+    public void testS2V2DirectoryPreprocessorForRootLevelDirectory() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+        final String            S3_PATH             = S3A_SCHEME + "aws_bucket1/root1";
+        Path                    path                = new Path(S3_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/");
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+    }
+
+    @Test
+    public void testS2V2DirectoryPreprocessorWithSameDirNames() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+        final String            S3_PATH             = S3_SCHEME + "temp/temp/temp/temp/";
+        Path                    path                = new Path(S3_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/");
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+    }
+
+    @Test
+    public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefix() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+        final String            S3_PATH             = S3A_SCHEME + "aws_bucket1/root1";
+        Path                    path                = new Path(S3_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/");
+        assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/");
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+    }
+
+    @Test
+    public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefixHavingSameDirNames() {
+        PathExtractorContext    extractorContext    = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+        final String            S3_PATH             = S3A_SCHEME + "temp/temp/temp/temp/";
+        Path                    path                = new Path(S3_PATH);
+        AtlasEntityWithExtInfo  entityWithExtInfo   = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity             entity              = entityWithExtInfo.getEntity();
+
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+        entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/temp/temp/");
+        assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+
+        HookNotification                    hookNotification    = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+        PreprocessorContext                 context             = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+
+        EntityPreprocessor                  preprocessor        = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+        preprocessor.preprocess(entity, context);
+
+        assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+        assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/");
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+    }
+}