ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
Change-Id: Ie3422851cb711da4e7c4d0845319db6c33333f65
Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 49c504f..1cdfcef 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,7 +19,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.utils.ShutdownableThread;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
@@ -122,6 +121,7 @@
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
+ private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
@@ -379,7 +379,7 @@
if (executors != null) {
executors.shutdown();
- if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ if (!executors.awaitTermination(KAFKA_CONSUMER_SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
@@ -523,21 +523,21 @@
}
@VisibleForTesting
- class HookConsumer extends ShutdownableThread {
+ class HookConsumer extends Thread {
private final NotificationConsumer<HookNotification> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
- super("atlas-hook-consumer-thread", false);
+ super("atlas-hook-consumer-thread");
this.consumer = consumer;
}
@Override
- public void doWork() {
- LOG.info("==> HookConsumer doWork()");
+ public void run() {
+ LOG.info("==> HookConsumer run()");
shouldRun.set(true);
@@ -572,12 +572,12 @@
consumer.close();
}
- LOG.info("<== HookConsumer doWork()");
+ LOG.info("<== HookConsumer run()");
}
}
@VisibleForTesting
- void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+ void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;
HookNotification message = kafkaMsg.getMessage();
String messageUser = message.getUser();
@@ -957,26 +957,19 @@
return true;
}
- @Override
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
// handle the case where thread was not started at all
// and shutdown called
- if (shouldRun.get() == false) {
+ if (!shouldRun.compareAndSet(true, false)) {
return;
}
- super.initiateShutdown();
-
- shouldRun.set(false);
-
if (consumer != null) {
consumer.wakeup();
}
- super.awaitShutdown();
-
LOG.info("<== HookConsumer shutdown()");
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index fdfc256..716f592 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -186,23 +186,19 @@
}
void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
- NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
- try {
- long startTime = System.currentTimeMillis(); //fetch starting time
+ NotificationHookConsumer.HookConsumer hookConsumer) {
+ long startTime = System.currentTimeMillis(); //fetch starting time
- while ((System.currentTimeMillis() - startTime) < 10000) {
- List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+ while ((System.currentTimeMillis() - startTime) < 10000) {
+ List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
- for (AtlasKafkaMessage<HookNotification> msg : messages) {
- hookConsumer.handleMessage(msg);
- }
-
- if (messages.size() > 0) {
- break;
- }
+ for (AtlasKafkaMessage<HookNotification> msg : messages) {
+ hookConsumer.handleMessage(msg);
}
- } catch (AtlasServiceException | AtlasException e) {
- Assert.fail("Consumer failed with exception ", e);
+
+ if (messages.size() > 0) {
+ break;
+ }
}
}