blob: 1cdfcef8ab59cb0b9297c21c4bdcbf768550d1eb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
import org.apache.atlas.util.AtlasMetricsCounter;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.LruCache;
import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.util.AtlasMetricsUtil.NotificationStat;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.*;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
import static org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
*/
@Component
@Order(5)
@DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES");
private static final int SC_OK = 200;
private static final int SC_BAD_REQUEST = 400;
private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
public static final String DUMMY_TABLE = "_dummy_table";
public static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final String CONSUMER_COMMIT_BATCH_SIZE = "atlas.notification.consumer.commit.batch.size";
public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final AtlasEntityStore atlasEntityStore;
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry;
private final AtlasMetricsUtil metricsUtil;
private final int maxRetries;
private final int failedMsgCacheSize;
private final int minWaitDuration;
private final int maxWaitDuration;
private final int commitBatchSize;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private final boolean updateHiveProcessNameWithQualifiedName;
private final int largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final List<String> hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String> hiveTablePrefixesToIgnore;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean s3V2DirectoryPruneObjectPrefix;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
private final boolean authorizeUsingMessageUser;
private final Map<String, Authentication> authnCache;
private final NotificationInterface notificationInterface;
private final Configuration applicationProperties;
private ExecutorService executors;
private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
private final EntityCorrelationManager entityCorrelationManager;
@VisibleForTesting
final int consumerRetryInterval;
@VisibleForTesting
List<HookConsumer> consumers;
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil,
EntityCorrelationStore entityCorrelationStore) throws AtlasException {
this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState;
this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
this.metricsUtil = metricsUtil;
this.lastCommittedPartitionOffset = new HashMap<>();
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
updateHiveProcessNameWithQualifiedName = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, true);
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
if (patternHiveTablesToIgnore != null) {
for (String pattern : patternHiveTablesToIgnore) {
try {
hiveTablesToIgnore.add(Pattern.compile(pattern));
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
}
}
}
if (patternHiveTablesToPrune != null) {
for (String pattern : patternHiveTablesToPrune) {
try {
hiveTablesToPrune.add(Pattern.compile(pattern));
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
}
}
}
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
hiveTablesCache = new LruCache<>(applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0);
} else {
hiveTablesCache = Collections.emptyMap();
}
boolean hiveDbIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, true);
boolean hiveTableIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, true);
boolean hiveTableIgnoreNamePrefixEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, true);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, hiveDbIgnoreDummyEnabled);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, hiveTableIgnoreDummyEnabled);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, hiveTableIgnoreNamePrefixEnabled);
if (hiveDbIgnoreDummyEnabled) {
String[] dummyDatabaseNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames, DUMMY_DATABASE);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyDatabasesToIgnore, ','));
} else {
hiveDummyDatabasesToIgnore = Collections.emptyList();
}
if (hiveTableIgnoreDummyEnabled) {
String[] dummyTableNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames, DUMMY_TABLE);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyTablesToIgnore, ','));
} else {
hiveDummyTablesToIgnore = Collections.emptyList();
}
if (hiveTableIgnoreNamePrefixEnabled) {
String[] ignoreNamePrefixes = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes, VALUES_TMP_TABLE_NAME_PREFIX);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, StringUtils.join(hiveTablePrefixesToIgnore, ','));
} else {
hiveTablePrefixesToIgnore = Collections.emptyList();
}
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, updateHiveProcessNameWithQualifiedName);
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true);
preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
LOG.info("{}={}", CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, s3V2DirectoryPruneObjectPrefix);
LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
}
@Override
public void start() throws AtlasException {
if (consumerDisabled) {
LOG.info("No hook messages will be processed. {} = {}", CONSUMER_DISABLED, consumerDisabled);
return;
}
startInternal(applicationProperties, null);
}
void startInternal(Configuration configuration, ExecutorService executorService) {
if (consumers == null) {
consumers = new ArrayList<>();
}
if (executorService != null) {
executors = executorService;
}
if (!HAConfiguration.isHAEnabled(configuration)) {
LOG.info("HA is disabled, starting consumers inline.");
startConsumers(executorService);
}
}
private void startConsumers(ExecutorService executorService) {
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
}
}
@Override
public void stop() {
//Allow for completion of outstanding work
try {
if (consumerDisabled) {
return;
}
stopConsumerThreads();
if (executors != null) {
executors.shutdown();
if (!executors.awaitTermination(KAFKA_CONSUMER_SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
executors = null;
}
notificationInterface.close();
} catch (InterruptedException e) {
LOG.error("Failure in shutting down consumers");
}
}
private void stopConsumerThreads() {
LOG.info("==> stopConsumerThreads()");
if (consumers != null) {
for (HookConsumer consumer : consumers) {
consumer.shutdown();
}
consumers.clear();
}
LOG.info("<== stopConsumerThreads()");
}
/**
* Start Kafka consumer threads that read from Kafka topic when server is activated.
* <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are started only on server activation.
*/
@Override
public void instanceIsActive() {
if (consumerDisabled) {
return;
}
LOG.info("Reacting to active state: initializing Kafka consumers");
startConsumers(executors);
}
/**
* Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
* <p>
* Since the consumers create / update entities to the shared backend store, only the active instance
* should perform this activity. Hence, these threads are stopped only on server deactivation.
*/
@Override
public void instanceIsPassive() {
if (consumerDisabled) {
return;
}
LOG.info("Reacting to passive state: shutting down Kafka consumers.");
stop();
}
@Override
public int getHandlerOrder() {
return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
}
static class Timer {
public void sleep(int interval) throws InterruptedException {
Thread.sleep(interval);
}
}
private List<String> trimAndPurge(String[] values, String defaultValue) {
final List<String> ret;
if (values != null && values.length > 0) {
ret = new ArrayList<>(values.length);
for (String val : values) {
if (StringUtils.isNotBlank(val)) {
ret.add(val.trim());
}
}
} else if (StringUtils.isNotBlank(defaultValue)) {
ret = Collections.singletonList(defaultValue.trim());
} else {
ret = Collections.emptyList();
}
return ret;
}
static class AdaptiveWaiter {
private final long increment;
private final long maxDuration;
private final long minDuration;
private final long resetInterval;
private long lastWaitAt;
@VisibleForTesting
long waitDuration;
public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
this.minDuration = minDuration;
this.maxDuration = maxDuration;
this.increment = increment;
this.waitDuration = minDuration;
this.lastWaitAt = 0;
this.resetInterval = maxDuration * 2;
}
public void pause(Exception ex) {
setWaitDurations();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", ex.getClass().getName(), waitDuration, ex);
}
Thread.sleep(waitDuration);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", ex.getClass().getName(), e);
}
}
}
private void setWaitDurations() {
long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
lastWaitAt = System.currentTimeMillis();
if (timeSinceLastWait > resetInterval) {
waitDuration = minDuration;
} else {
waitDuration += increment;
if (waitDuration > maxDuration) {
waitDuration = maxDuration;
}
}
}
}
@VisibleForTesting
class HookConsumer extends Thread {
private final NotificationConsumer<HookNotification> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread");
this.consumer = consumer;
}
@Override
public void run() {
LOG.info("==> HookConsumer run()");
shouldRun.set(true);
if (!serverAvailable(new NotificationHookConsumer.Timer())) {
return;
}
try {
while (shouldRun.get()) {
try {
List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
for (AtlasKafkaMessage<HookNotification> msg : messages) {
handleMessage(msg);
}
} catch (IllegalStateException ex) {
adaptiveWaiter.pause(ex);
} catch (Exception e) {
if (shouldRun.get()) {
LOG.warn("Exception in NotificationHookConsumer", e);
adaptiveWaiter.pause(e);
} else {
break;
}
}
}
} finally {
if (consumer != null) {
LOG.info("closing NotificationConsumer");
consumer.close();
}
LOG.info("<== HookConsumer run()");
}
}
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;
HookNotification message = kafkaMsg.getMessage();
String messageUser = message.getUser();
long startTime = System.currentTimeMillis();
NotificationStat stats = new NotificationStat();
AuditLog auditLog = null;
if (authorizeUsingMessageUser) {
setCurrentUser(messageUser);
}
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
}
try {
// covert V1 messages to V2 to enable preProcess
try {
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
final EntityCreateRequestV2 v2Request = new EntityCreateRequestV2(message.getUser(), entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
final EntityUpdateRequestV2 v2Request = new EntityUpdateRequestV2(messageUser, entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
}
} catch (AtlasBaseException excp) {
LOG.error("handleMessage(): failed to convert V1 message to V2", message.getType().name());
}
PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) {
commit(kafkaMsg);
return;
}
// Used for intermediate conversions during create and update
String exceptionClassName = StringUtils.EMPTY;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
}
try {
RequestContext requestContext = RequestContext.get();
requestContext.setAttemptCount(numRetries + 1);
requestContext.setMaxAttempts(maxRetries);
requestContext.setUser(messageUser, null);
requestContext.setInNotificationProcessing(true);
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClient.API_V1.CREATE_ENTITY.getMethod(),
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_PARTIAL_UPDATE: {
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
final Referenceable referenceable = partialUpdateRequest.getEntity();
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntity(referenceable);
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), partialUpdateRequest.getTypeName()));
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
createOrUpdate(entities, true, stats, context);
}
break;
case ENTITY_DELETE: {
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), deleteRequest.getTypeName()));
}
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
stats.updateStats(response);
entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
} catch (ClassCastException cle) {
LOG.error("Failed to delete entity {}", deleteRequest);
}
}
break;
case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(),
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_CREATE_V2: {
final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(),
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_PARTIAL_UPDATE_V2: {
final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message;
final AtlasObjectId entityId = partialUpdateRequest.getEntityId();
final AtlasEntityWithExtInfo entity = partialUpdateRequest.getEntity();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(),
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true);
stats.updateStats(response);
}
break;
case ENTITY_FULL_UPDATE_V2: {
final EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = updateRequest.getEntities();
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(),
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
createOrUpdate(entities, false, stats, context);
}
break;
case ENTITY_DELETE_V2: {
final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
final List<AtlasObjectId> entities = deleteRequest.getEntities();
try {
for (AtlasObjectId entity : entities) {
if (auditLog == null) {
auditLog = new AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entity.getTypeName()));
}
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
stats.updateStats(response);
entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities());
}
} catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities);
}
}
break;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
}
if (StringUtils.isNotEmpty(exceptionClassName)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. Handled!",
exceptionClassName, numRetries, adaptiveWaiter.waitDuration);
exceptionClassName = StringUtils.EMPTY;
}
break;
} catch (Throwable e) {
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
if (numRetries == (maxRetries - 1)) {
String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("Max retries exceeded for message {}", strMessage, e);
stats.isFailedMsg = true;
failedMessages.add(strMessage);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
return;
} else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage());
} else if (exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION)
|| exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
LOG.warn("{}: Pausing & retry: Try: {}: Pause: {} ms. {}",
exceptionClassName, numRetries, adaptiveWaiter.waitDuration, e.getMessage());
adaptiveWaiter.pause((Exception) e);
} else {
LOG.warn("Error handling message", e);
try {
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval);
} catch (InterruptedException ie) {
LOG.error("Notification consumer thread sleep interrupted");
}
}
} finally {
RequestContext.clear();
}
}
commit(kafkaMsg);
} finally {
AtlasPerfTracer.log(perf);
stats.timeTakenMs = System.currentTimeMillis() - startTime;
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
}
if (auditLog != null) {
auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(stats.timeTakenMs);
AuditFilter.audit(auditLog);
}
Instant now = Instant.now();
if (now.isAfter(nextStatsLogTime)) {
LOG.info("STATS: {}", AtlasJson.toJson(metricsUtil.getStats()));
nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(now);
}
}
}
private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, NotificationStat stats, PreprocessorContext context) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
recordProcessedEntities(response, stats, context);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize;
if (toIndex > entitiesList.size()) {
toIndex = entitiesList.size();
}
List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
updateProcessedEntityReferences(entitiesBatch, context.getGuidAssignments());
AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch);
AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream);
EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
recordProcessedEntities(response, stats, context);
RequestContext.get().resetEntityGuidUpdates();
entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities());
RequestContext.get().clearCache();
}
}
if (context != null) {
context.prepareForPostUpdate();
List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities();
if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true);
}
}
}
private void recordFailedMessages() {
//logging failed messages
for (String message : failedMessages) {
FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message);
}
failedMessages.clear();
}
private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
recordFailedMessages();
long commitOffset = kafkaMessage.getOffset() + 1;
lastCommittedPartitionOffset.put(kafkaMessage.getTopicPartition(), commitOffset);
consumer.commit(kafkaMessage.getTopicPartition(), commitOffset);
}
boolean serverAvailable(Timer timer) {
try {
while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS);
timer.sleep(SERVER_READY_WAIT_TIME_MS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e);
return false;
}
}
} catch (Throwable e) {
LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e);
return false;
}
LOG.info("Atlas Server is ready, can start reading Kafka events.");
return true;
}
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
// handle the case where thread was not started at all
// and shutdown called
if (!shouldRun.compareAndSet(true, false)) {
return;
}
if (consumer != null) {
consumer.wakeup();
}
LOG.info("<== HookConsumer shutdown()");
}
}
private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
PreprocessorContext context = null;
if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
}
if (skipHiveColumnLineageHive20633) {
skipHiveColumnLineage(context);
}
if (rdbmsTypesRemoveOwnedRefAttrs) {
rdbmsTypeRemoveOwnedRefAttrs(context);
}
if (s3V2DirectoryPruneObjectPrefix) {
pruneObjectPrefixForS3V2Directory(context);
}
context.moveRegisteredReferredEntities();
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
// move hive_process and hive_column_lineage entities to end of the list
List<AtlasEntity> entities = context.getEntities();
int count = entities.size();
for (int i = 0; i < count; i++) {
AtlasEntity entity = entities.get(i);
switch (entity.getTypeName()) {
case TYPE_HIVE_PROCESS:
case TYPE_HIVE_COLUMN_LINEAGE:
entities.remove(i--);
entities.add(entity);
count--;
break;
}
}
if (entities.size() - count > 0) {
LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition());
}
}
}
return context;
}
private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
}
}
}
}
private void pruneObjectPrefixForS3V2Directory(PreprocessorContext context) {
List<AtlasEntity> entities = new ArrayList<>();
if (CollectionUtils.isNotEmpty(context.getEntities())) {
entities.addAll(context.getEntities());
}
if (MapUtils.isNotEmpty(context.getReferredEntities())) {
entities.addAll(context.getReferredEntities().values());
}
if (CollectionUtils.isNotEmpty(entities)) {
for (AtlasEntity entity : entities) {
EntityPreprocessor preprocessor = EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
}
}
}
}
private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
entities.remove(i--);
}
}
}
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
if (referredEntities != null) {
for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next().getValue();
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
iter.remove();
}
}
}
}
int ignoredEntities = context.getIgnoredEntities().size();
int prunedEntities = context.getPrunedEntities().size();
if (ignoredEntities > 0 || prunedEntities > 0) {
LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", ignoredEntities, prunedEntities, context.getKafkaMessageOffset(), context.getKafkaPartition());
}
}
}
private void skipHiveColumnLineage(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
int lineageCount = 0;
int lineageInputsCount = 0;
int numRemovedEntities = 0;
Set<String> lineageQNames = new HashSet<>();
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (qName != null) {
final String qualifiedName = qName.toString();
if (lineageQNames.contains(qualifiedName)) {
entities.remove(i--);
LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
numRemovedEntities++;
continue;
} else {
lineageQNames.add(qualifiedName);
}
}
lineageCount++;
Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
if (objInputs instanceof Collection) {
Collection inputs = (Collection) objInputs;
lineageInputsCount += inputs.size();
}
}
}
float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
entities.remove(i--);
numRemovedEntities++;
}
}
}
if (numRemovedEntities > 0) {
LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
}
}
}
private boolean isEmptyMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
final boolean ret;
final HookNotification message = kafkaMsg.getMessage();
switch (message.getType()) {
case ENTITY_CREATE_V2: {
AtlasEntitiesWithExtInfo entities = ((EntityCreateRequestV2) message).getEntities();
ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
}
break;
case ENTITY_FULL_UPDATE_V2: {
AtlasEntitiesWithExtInfo entities = ((EntityUpdateRequestV2) message).getEntities();
ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
}
break;
default:
ret = false;
break;
}
return ret;
}
private void recordProcessedEntities(EntityMutationResponse mutationResponse, NotificationStat stats, PreprocessorContext context) {
if (mutationResponse != null) {
if (stats != null) {
stats.updateStats(mutationResponse);
}
if (context != null) {
if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
}
if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
if (entity != null && entity.getGuid() != null) {
context.getCreatedEntities().add(entity.getGuid());
}
}
}
if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
if (entity != null && entity.getGuid() != null) {
context.getDeletedEntities().add(entity.getGuid());
}
}
}
}
}
}
private void updateProcessedEntityReferences(List<AtlasEntity> entities, Map<String, String> guidAssignments) {
if (CollectionUtils.isNotEmpty(entities) && MapUtils.isNotEmpty(guidAssignments)) {
for (AtlasEntity entity : entities) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (entityType == null) {
continue;
}
if (MapUtils.isNotEmpty(entity.getAttributes())) {
for (Map.Entry<String, Object> entry : entity.getAttributes().entrySet()) {
String attrName = entry.getKey();
Object attrValue = entry.getValue();
if (attrValue == null) {
continue;
}
AtlasAttribute attribute = entityType.getAttribute(attrName);
if (attribute == null) { // look for a relationship attribute with the same name
attribute = entityType.getRelationshipAttribute(attrName, null);
}
if (attribute != null && attribute.isObjectRef()) {
updateProcessedEntityReferences(attrValue, guidAssignments);
}
}
}
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
for (Map.Entry<String, Object> entry : entity.getRelationshipAttributes().entrySet()) {
Object attrValue = entry.getValue();
if (attrValue != null) {
updateProcessedEntityReferences(attrValue, guidAssignments);
}
}
}
}
}
}
private void updateProcessedEntityReferences(Object objVal, Map<String, String> guidAssignments) {
if (objVal instanceof AtlasObjectId) {
updateProcessedEntityReferences((AtlasObjectId) objVal, guidAssignments);
} else if (objVal instanceof Collection) {
updateProcessedEntityReferences((Collection) objVal, guidAssignments);
} else if (objVal instanceof Map) {
updateProcessedEntityReferences((Map) objVal, guidAssignments);
}
}
private void updateProcessedEntityReferences(AtlasObjectId objId, Map<String, String> guidAssignments) {
String guid = objId.getGuid();
if (guid != null && guidAssignments.containsKey(guid)) {
String assignedGuid = guidAssignments.get(guid);
if (LOG.isDebugEnabled()) {
LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.getTypeName(), guid, assignedGuid);
}
objId.setGuid(assignedGuid);
objId.setTypeName(null);
objId.setUniqueAttributes(null);
}
}
private void updateProcessedEntityReferences(Map objId, Map<String, String> guidAssignments) {
Object guid = objId.get(KEY_GUID);
if (guid != null && guidAssignments.containsKey(guid)) {
String assignedGuid = guidAssignments.get(guid);
if (LOG.isDebugEnabled()) {
LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.get(KEY_TYPENAME), guid, assignedGuid);
}
objId.put(KEY_GUID, assignedGuid);
objId.remove(KEY_TYPENAME);
objId.remove(KEY_UNIQUE_ATTRIBUTES);
}
}
private void updateProcessedEntityReferences(Collection objIds, Map<String, String> guidAssignments) {
for (Object objId : objIds) {
updateProcessedEntityReferences(objId, guidAssignments);
}
}
private void setCurrentUser(String userName) {
Authentication authentication = getAuthenticationForUser(userName);
if (LOG.isDebugEnabled()) {
if (authentication != null) {
LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", userName);
} else {
LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", userName);
}
}
SecurityContextHolder.getContext().setAuthentication(authentication);
}
private Authentication getAuthenticationForUser(String userName) {
Authentication ret = null;
if (StringUtils.isNotBlank(userName)) {
ret = authnCache != null ? authnCache.get(userName) : null;
if (ret == null) {
List<GrantedAuthority> grantedAuths = getAuthoritiesFromUGI(userName);
UserDetails principal = new User(userName, "", grantedAuths);
ret = new UsernamePasswordAuthenticationToken(principal, "");
if (authnCache != null) {
authnCache.put(userName, ret);
}
}
}
return ret;
}
}