blob: 3ce3942167592d59000bbbcfbf51d9dc4dd746ba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook.utils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class HiveDDLEntityFilter implements EntityFilter {
private static final Logger LOG = LoggerFactory.getLogger(HiveDDLEntityFilter.class);
private static final Set<String> defaultPathTypesToRetain = new HashSet<String>() {{
add(AtlasPathExtractorUtil.HDFS_TYPE_PATH);
add(AtlasPathExtractorUtil.ADLS_GEN2_DIRECTORY);
add(AtlasPathExtractorUtil.GCS_VIRTUAL_DIR);
add(AtlasPathExtractorUtil.AWS_S3_V2_PSEUDO_DIR);
add(AtlasPathExtractorUtil.AWS_S3_PSEUDO_DIR);
}};
private static final Set<String> typesToRetain = new HashSet<String>() {{
add(BaseHiveEvent.HIVE_TYPE_PROCESS);
add(BaseHiveEvent.HIVE_TYPE_PROCESS_EXECUTION);
add(BaseHiveEvent.HIVE_TYPE_COLUMN_LINEAGE);
add(BaseHiveEvent.HIVE_DB_DDL);
add(BaseHiveEvent.HIVE_TABLE_DDL);
addAll(defaultPathTypesToRetain);
}};
public HiveDDLEntityFilter(List<String> additionalTypesToRetain) {
if (CollectionUtils.isEmpty(additionalTypesToRetain)) {
return;
}
typesToRetain.addAll(additionalTypesToRetain);
LOG.info("Types retained: {}", typesToRetain.toArray());
}
public List<HookNotification> apply(List<HookNotification> incoming) {
if (CollectionUtils.isEmpty(incoming)) {
return incoming;
}
List<HookNotification> ret = new ArrayList<>();
for (HookNotification notification : incoming) {
HookNotification filteredNotification = apply(notification);
if (filteredNotification == null) {
continue;
}
ret.add(filteredNotification);
}
return ret;
}
@VisibleForTesting
AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo incoming) {
AtlasEntity.AtlasEntityWithExtInfo ret = new AtlasEntity.AtlasEntityWithExtInfo();
AtlasEntity entity = filter(incoming.getEntity());
if (entity == null) {
return null;
}
ret.setEntity(entity);
Map<String, AtlasEntity> refEntities = filter(incoming.getReferredEntities());
if (!MapUtils.isEmpty(refEntities)) {
ret.setReferredEntities(refEntities);
}
return ret;
}
@VisibleForTesting
public AtlasEntity.AtlasEntitiesWithExtInfo apply(AtlasEntity.AtlasEntitiesWithExtInfo incoming) {
if (incoming == null) {
return incoming;
}
AtlasEntity.AtlasEntitiesWithExtInfo ret = new AtlasEntity.AtlasEntitiesWithExtInfo();
filterEntities(incoming, ret);
filterReferences(incoming, ret);
return ret;
}
@VisibleForTesting
List<AtlasObjectId> applyForObjectIds(List<AtlasObjectId> incoming) {
if (incoming == null || CollectionUtils.isEmpty(incoming)) {
return null;
}
List<AtlasObjectId> ret = new ArrayList<>();
for (AtlasObjectId o : incoming) {
if (filterObjectId(o) != null) {
ret.add(o);
}
}
return ret;
}
private AtlasObjectId filterObjectId(AtlasObjectId o) {
if (o != null && typesToRetain.contains(o.getTypeName())) {
return o;
}
return null;
}
private static void filterEntities(AtlasEntity.AtlasEntitiesWithExtInfo incoming, AtlasEntity.AtlasEntitiesWithExtInfo ret) {
ret.setEntities(filter(incoming.getEntities()));
}
private static void filterReferences(AtlasEntity.AtlasEntitiesWithExtInfo incoming, AtlasEntity.AtlasEntitiesWithExtInfo ret) {
ret.setReferredEntities(filter(incoming.getReferredEntities()));
}
private static Map<String, AtlasEntity> filter(Map<String, AtlasEntity> incoming) {
if (incoming == null || MapUtils.isEmpty(incoming)) {
return null;
}
return incoming.values()
.stream()
.filter(x -> typesToRetain.contains(x.getTypeName()))
.collect(Collectors.toMap(AtlasEntity::getGuid, Function.identity()));
}
private static List<AtlasEntity> filter(List<AtlasEntity> incoming) {
if (incoming == null) {
return null;
}
List<AtlasEntity> ret = incoming.stream()
.filter(x -> typesToRetain.contains(x.getTypeName()))
.collect(Collectors.toList());
for (AtlasEntity e : ret) {
for (Object o : e.getRelationshipAttributes().values()) {
if (o instanceof List) {
List list = (List) o;
for (Object ox : list) {
inferObjectTypeResetGuid(ox);
}
} else {
inferObjectTypeResetGuid(o);
}
}
}
return ret;
}
private static void inferObjectTypeResetGuid(Object o) {
if (o instanceof AtlasObjectId) {
AtlasObjectId oid = (AtlasObjectId) o;
String typeName = oid.getTypeName();
if (oid.getUniqueAttributes() != null && !typesToRetain.contains(typeName)) {
oid.setGuid(null);
}
} else {
LinkedHashMap hm = (LinkedHashMap) o;
if (!hm.containsKey(BaseHiveEvent.ATTRIBUTE_GUID)) {
return;
}
String typeName = hm.containsKey(AtlasObjectId.KEY_TYPENAME) ? (String) hm.get(AtlasObjectId.KEY_TYPENAME) : null;
if (hm.containsKey(BaseHiveEvent.ATTRIBUTE_UNIQUE_ATTRIBUTES) && !typesToRetain.contains(typeName)) {
hm.remove(BaseHiveEvent.ATTRIBUTE_GUID);
}
}
}
private static AtlasEntity filter(AtlasEntity incoming) {
if (incoming == null) {
return null;
}
return typesToRetain.contains(incoming.getTypeName()) ? incoming : null;
}
private HookNotification apply(HookNotification notification) {
if (notification instanceof HookNotification.EntityCreateRequestV2) {
return apply((HookNotification.EntityCreateRequestV2) notification);
}
if (notification instanceof HookNotification.EntityUpdateRequestV2) {
return apply((HookNotification.EntityUpdateRequestV2) notification);
}
if (notification instanceof HookNotification.EntityPartialUpdateRequestV2) {
return apply((HookNotification.EntityPartialUpdateRequestV2) notification);
}
if (notification instanceof HookNotification.EntityDeleteRequestV2) {
return apply((HookNotification.EntityDeleteRequestV2) notification);
}
return null;
}
private HookNotification.EntityCreateRequestV2 apply(HookNotification.EntityCreateRequestV2 notification) {
AtlasEntity.AtlasEntitiesWithExtInfo entities = apply(notification.getEntities());
if (entities == null || CollectionUtils.isEmpty(entities.getEntities())) {
return null;
}
return new HookNotification.EntityCreateRequestV2(notification.getUser(), entities);
}
private HookNotification.EntityUpdateRequestV2 apply(HookNotification.EntityUpdateRequestV2 notification) {
AtlasEntity.AtlasEntitiesWithExtInfo entities = apply(notification.getEntities());
if (entities == null || CollectionUtils.isEmpty(entities.getEntities())) {
return null;
}
return new HookNotification.EntityUpdateRequestV2(notification.getUser(), entities);
}
private HookNotification.EntityPartialUpdateRequestV2 apply(HookNotification.EntityPartialUpdateRequestV2 notification) {
AtlasObjectId objectId = filterObjectId(notification.getEntityId());
if (objectId == null) {
return null;
}
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = apply(notification.getEntity());
if (entityWithExtInfo == null) {
return null;
}
return new HookNotification.EntityPartialUpdateRequestV2(notification.getUser(), objectId, entityWithExtInfo);
}
private HookNotification.EntityDeleteRequestV2 apply(HookNotification.EntityDeleteRequestV2 notification) {
List<AtlasObjectId> objectIds = applyForObjectIds(notification.getEntities());
if (CollectionUtils.isEmpty(objectIds)) {
return null;
}
return new HookNotification.EntityDeleteRequestV2(notification.getUser(), objectIds);
}
}