ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2 directory objectPrefix Issue with Atlas Server and Hook versions mismatch
Signed-off-by: sidmishra <sidmishra@apache.org>
diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
index a9f2e50..01a67b7 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -231,7 +231,7 @@
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
diff --git a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
index 6bf5d57..f35e9ae 100644
--- a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
+++ b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -372,16 +372,16 @@
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "Irradiance_A.csv");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/Irradiance_A.csv/");
} else {
pathQName = s3Scheme + "aws_my_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE;
if (pathQName.equalsIgnoreCase(entityQName)){
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
} else {
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE);
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders");
- assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
}
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 5643af9..49c504f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -154,6 +154,7 @@
public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
+ public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
@@ -182,6 +183,7 @@
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
+ private final boolean s3V2DirectoryPruneObjectPrefix;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
private final boolean authorizeUsingMessageUser;
@@ -310,12 +312,15 @@
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
- preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
+ s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true);
+
+ preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, s3V2DirectoryPruneObjectPrefix);
LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
}
@@ -982,7 +987,7 @@
if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
- rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
+ rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
@@ -996,6 +1001,10 @@
rdbmsTypeRemoveOwnedRefAttrs(context);
}
+ if (s3V2DirectoryPruneObjectPrefix) {
+ pruneObjectPrefixForS3V2Directory(context);
+ }
+
context.moveRegisteredReferredEntities();
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
@@ -1040,6 +1049,28 @@
}
}
+ private void pruneObjectPrefixForS3V2Directory(PreprocessorContext context) {
+ List<AtlasEntity> entities = new ArrayList<>();
+
+ if (CollectionUtils.isNotEmpty(context.getEntities())) {
+ entities.addAll(context.getEntities());
+ }
+
+ if (MapUtils.isNotEmpty(context.getReferredEntities())) {
+ entities.addAll(context.getReferredEntities().values());
+ }
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ for (AtlasEntity entity : entities) {
+ EntityPreprocessor preprocessor = EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName());
+
+ if (preprocessor != null) {
+ preprocessor.preprocess(entity, context);
+ }
+ }
+ }
+ }
+
private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
new file mode 100644
index 0000000..6102572
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AWSS3V2Preprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2Preprocessor.class);
+
+ private static final String AWS_S3_V2_DIR_TYPE = "aws_s3_v2_directory";
+ private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
+ private static final String SCHEME_SEPARATOR = "://";
+
+ static class AWSS3V2DirectoryPreprocessor extends EntityPreprocessor {
+ protected AWSS3V2DirectoryPreprocessor() {
+ super(AWS_S3_V2_DIR_TYPE);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (context.getS3V2DirectoryPruneObjectPrefix()) {
+ String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String objectPrefix = (String) entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX);
+
+ if (isObjectPrefixPruneNeeded(qualifiedName, objectPrefix)) {
+ if (objectPrefix.lastIndexOf(Path.SEPARATOR) == -1) {
+ objectPrefix = Path.SEPARATOR;
+ } else {
+ if (doesEndsWithPathSeparator(objectPrefix)) {
+ objectPrefix = removeLastPathSeparator(objectPrefix);
+ }
+
+ objectPrefix = objectPrefix.substring(0, objectPrefix.lastIndexOf(Path.SEPARATOR) + 1);
+ }
+
+ LOG.info("Aws S3 V2 Preprocessor: Pruning {} from {} to {}", ATTRIBUTE_OBJECT_PREFIX + QNAME_SEP_CLUSTER_NAME + AWS_S3_V2_DIR_TYPE,
+ entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), objectPrefix);
+
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, objectPrefix);
+ }
+ }
+ }
+
+ private boolean isObjectPrefixPruneNeeded(String qualifiedName, String objectPrefix) {
+ return (StringUtils.isNotBlank(qualifiedName)
+ && StringUtils.isNotBlank(objectPrefix)
+ && qualifiedName.contains(getSchemeAndBucket(qualifiedName) + objectPrefix + QNAME_SEP_CLUSTER_NAME));
+ }
+
+ private String getSchemeAndBucket(String qualifiedName) {
+ String ret = "";
+
+ if (StringUtils.isNotEmpty(qualifiedName) && qualifiedName.contains(SCHEME_SEPARATOR)) {
+ int schemeSeparatorEndPosition = qualifiedName.indexOf(SCHEME_SEPARATOR) + SCHEME_SEPARATOR.length();
+ int bucketEndPosition = qualifiedName.indexOf(Path.SEPARATOR, schemeSeparatorEndPosition);
+ ret = qualifiedName.substring(0, bucketEndPosition);
+ }
+
+ return ret;
+ }
+
+ private boolean doesEndsWithPathSeparator(String path) {
+ return path.endsWith(Path.SEPARATOR);
+ }
+
+ private String removeLastPathSeparator(String path) {
+ return StringUtils.chop(path);
+ }
+ }
+
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7f0cafe..f8eac4c 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -64,8 +64,9 @@
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String QNAME_SD_SUFFIX = "_storage";
- private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>();
- private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>();
private final String typeName;
@@ -88,6 +89,10 @@
new RdbmsPreprocessor.RdbmsTablePreprocessor()
};
+ EntityPreprocessor[] s3V2Preprocessors = new EntityPreprocessor[] {
+ new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor()
+ };
+
for (EntityPreprocessor preprocessor : hivePreprocessors) {
HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
@@ -95,6 +100,10 @@
for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
+
+ for (EntityPreprocessor preprocessor : s3V2Preprocessors) {
+ AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+ }
}
protected EntityPreprocessor(String typeName) {
@@ -116,6 +125,10 @@
return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
}
+ public static EntityPreprocessor getS3V2Preprocessor(String typeName) {
+ return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : null;
+ }
+
public static String getQualifiedName(AtlasEntity entity) {
Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 59f6440..f930d9f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -63,6 +63,7 @@
private final boolean updateHiveProcessNameWithQualifiedName;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
+ private final boolean s3V2DirectoryPruneObjectPrefix;
private final boolean isHivePreProcessEnabled;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
@@ -73,7 +74,7 @@
private final EntityCorrelationManager correlationManager;
private List<AtlasEntity> postUpdateEntities = null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationManager correlationManager) {
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean s3V2DirectoryPruneObjectPrefix, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationManager correlationManager) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
@@ -84,6 +85,7 @@
this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
+ this.s3V2DirectoryPruneObjectPrefix = s3V2DirectoryPruneObjectPrefix;
this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName;
final HookNotification message = kafkaMessage.getMessage();
@@ -124,6 +126,10 @@
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
+ public boolean getS3V2DirectoryPruneObjectPrefix() {
+ return s3V2DirectoryPruneObjectPrefix;
+ }
+
public boolean isHivePreprocessEnabled() {
return isHivePreProcessEnabled;
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
new file mode 100644
index 0000000..9c6c92a
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.PathExtractorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+public class AWSS3V2PreprocessorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2PreprocessorTest.class);
+ private static final String METADATA_NAMESPACE = "cm";
+ private static final String QNAME_METADATA_NAMESPACE = '@' + METADATA_NAMESPACE;
+ private static final String AWS_S3_MODEL_VERSION_V2 = "V2";
+
+ private static final String SCHEME_SEPARATOR = "://";
+ private static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR;
+ private static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR;
+ private static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR;
+
+ private static final String ATTRIBUTE_NAME = "name";
+ private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
+ private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+ private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
+ private static final List<Pattern> EMPTY_PATTERN_LIST = new ArrayList<>();
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForOtherTypes() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
+ final String ABFS_PATH = ABFS_SCHEME + "data@razrangersan.dfs.core.windows.net/tmp/cdp-demo/sample.csv";
+ Path path = new Path(ABFS_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), null);
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertNotEquals(entity.getTypeName(), preprocessor.getTypeName());
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForFullPath() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME + "aws_bucket1/1234567890/test/data1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/data1/");
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/1234567890/test/data1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data1");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForRootLevelDirectory() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME + "aws_bucket1/root1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/");
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorWithSameDirNames() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3_SCHEME + "temp/temp/temp/temp/";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefix() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME + "aws_bucket1/root1";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/");
+ assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/");
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1");
+ }
+
+ @Test
+ public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefixHavingSameDirNames() {
+ PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2);
+ final String S3_PATH = S3A_SCHEME + "temp/temp/temp/temp/";
+ Path path = new Path(S3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+ entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/temp/temp/");
+ assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/");
+
+ HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+ EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+ false, true, false, null);
+
+ EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor();
+
+ preprocessor.preprocess(entity, context);
+
+ assertEquals(entity.getTypeName(), preprocessor.getTypeName());
+ assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/");
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp");
+ }
+}