blob: cff538c73208993b30c424a34453ebf969a8c56c [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.odf.core.test.messaging.kafka;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.apache.atlas.odf.api.settings.MessagingConfiguration;
import org.apache.atlas.odf.api.settings.ODFSettings;
import org.apache.atlas.odf.api.settings.validation.ValidationException;
import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.wink.json4j.JSONException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.atlas.odf.api.ODFFactory;
import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
import org.apache.atlas.odf.api.settings.SettingsManager;
import org.apache.atlas.odf.core.Environment;
import org.apache.atlas.odf.core.ODFInternalFactory;
import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
import org.apache.atlas.odf.core.controlcenter.DefaultThreadManager;
import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
import org.apache.atlas.odf.core.messaging.kafka.KafkaProducerManager;
import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
import org.apache.atlas.odf.core.test.ODFTestBase;
import org.apache.atlas.odf.core.test.ODFTestLogger;
import org.apache.atlas.odf.json.JSONUtils;
public class KafkaQueueManagerTest extends ODFTestBase {
private static Long origRetention;
Logger logger = ODFTestLogger.get();
String zookeeperConnectString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
@BeforeClass
public static void setupTrackerRetention() throws ValidationException {
SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
//SETUP RETENTION TO KEEP TRACKERS!!!
final MessagingConfiguration messagingConfiguration = settingsManager.getODFSettings().getMessagingConfiguration();
origRetention = messagingConfiguration.getAnalysisRequestRetentionMs();
messagingConfiguration.setAnalysisRequestRetentionMs(120000000l);
ODFTestLogger.get().info("Set request retention to " + settingsManager.getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs());
}
@AfterClass
public static void cleanupTrackerRetention() throws ValidationException {
SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
ODFSettings settings = settingsManager.getODFSettings();
settings.getMessagingConfiguration().setAnalysisRequestRetentionMs(origRetention);
settingsManager.updateODFSettings(settings);
}
@Test
public void testStatusQueue() throws Exception {
KafkaQueueManager kqm = new KafkaQueueManager();
logger.info("Queue manager created");
AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
long before = System.currentTimeMillis();
tracker.setLastModified(before);
int maxEntries = 10;
for (int i = 0; i < maxEntries; i++) {
tracker.getRequest().setId("id" + i);
StatusQueueEntry sqe = new StatusQueueEntry();
sqe.setAnalysisRequestTracker(tracker);
kqm.enqueueInStatusQueue(sqe);
// System.out.println("tracker "+i+" enqueued in status queue");
}
long after = System.currentTimeMillis();
logger.info("Time for enqueueing " + maxEntries + " objects: " + (after - before) + ", " + ((after - before) / maxEntries) + "ms per object");
Thread.sleep(100 * maxEntries);
AnalysisRequestTrackerStore store = new DefaultStatusQueueStore();
for (int i = 0; i < maxEntries; i++) {
logger.info("Querying status " + i);
AnalysisRequestTracker queriedTracker = store.query("id" + i);
Assert.assertNotNull(queriedTracker);
Assert.assertEquals(STATUS.FINISHED, queriedTracker.getStatus());
}
// Thread.sleep(5000);
// Assert.fail("you fail");
logger.info("Test testEnqueueStatusQueue finished");
}
/**
* This test creates a tracker, puts it on the status queue, kills the service consumer and creates a new dummy consumer to put the offset of the service consumer behind the new tracker
* Then the status consumer is shut down and its offset is reset in order to make it consume from the start again and thereby cleaning up stuck processes
* Then kafka queue manager is re-initialized, causing all consumers to come up and triggering the cleanup process
*/
@Test
@Ignore("Adjust once ServiceRuntimes are fully implemented")
public void testStuckRequestCleanup() throws JSONException, InterruptedException, ExecutionException, TimeoutException {
final AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json",
null);
tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
tracker.setNextDiscoveryServiceRequest(0);
tracker.setLastModified(System.currentTimeMillis());
final String newTrackerId = "KAFKA_QUEUE_MANAGER_09_TEST" + UUID.randomUUID().toString();
tracker.getRequest().setId(newTrackerId);
DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
final DiscoveryServiceProperties discoveryServiceRegistrationInfo = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()
.get(0);
dsRequest.setDiscoveryServiceId(discoveryServiceRegistrationInfo.getId());
String dsID = dsRequest.getDiscoveryServiceId();
String topicName = KafkaQueueManager.SERVICE_TOPIC_PREFIX + dsID;
//Add tracker to queue, set offset behind request so that it should be cleanup
String consumerGroupId = "odf-topic-" + dsID + "_group";
String threadName = "Dummy_DiscoveryServiceQueueConsumer" + topicName;
final List<Throwable> multiThreadErrors = new ArrayList<Throwable>();
final DefaultThreadManager tm = new DefaultThreadManager();
logger.info("shutdown old test 09 consumer and replace with fake doing nothing");
for (int no = 0; no < discoveryServiceRegistrationInfo.getParallelismCount(); no++) {
tm.shutdownThreads(Collections.singletonList("DiscoveryServiceQueueConsumer" + topicName + "_" + no));
}
Properties kafkaConsumerProps = getKafkaConsumerConfigProperties(consumerGroupId);
final long[] producedMsgOffset = new long[1];
final CountDownLatch msgProcessingLatch = new CountDownLatch(1);
ThreadStartupResult created = tm.startUnmanagedThread(threadName, new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
@Override
public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
logger.info("Dequeue without processing " + msgOffset);
if (msgOffset == producedMsgOffset[0]) {
try {
msgProcessingLatch.countDown();
} catch (Exception e) {
msgProcessingLatch.countDown();
multiThreadErrors.add(e);
}
}
}
}));
tm.waitForThreadsToBeReady(30000, Arrays.asList(created));
String key = tracker.getRequest().getId();
String value = JSONUtils.toJSON(tracker);
new DefaultStatusQueueStore().store(tracker);
KafkaMonitor kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
List<String> origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
logger.info("Found status consumers: " + origQueueConsumers.toString() + ", shutting down StatusWatcher");
//kill status queue watcher so that it is restarted when queue manager is initialized and detects stuck requests
tm.shutdownThreads(Collections.singletonList("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
int maxWaitForConsumerDeath = 60;
while (tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.NON_EXISTENT
|| tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.FINISHED && maxWaitForConsumerDeath > 0) {
maxWaitForConsumerDeath--;
Thread.sleep(500);
}
logger.info("Only 1 consumer left? " + maxWaitForConsumerDeath + " : " + tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
logger.info(" set offset for status consumer to beginning so that it consumes from when restarting");
final int offset = 1000000;
for (String statusConsumerGroup : origQueueConsumers) {
if (statusConsumerGroup.contains("DSStatusWatcherConsumerGroup")) {
boolean success = false;
int retryCount = 0;
final int maxOffsetRetry = 20;
while (!success && retryCount < maxOffsetRetry) {
success = kafkaMonitor.setOffset(zookeeperConnectString, statusConsumerGroup, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE, 0, offset);
retryCount++;
Thread.sleep(500);
}
Assert.assertNotEquals(retryCount, maxOffsetRetry);
Assert.assertTrue(success);
}
}
new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
producedMsgOffset[0] = metadata.offset();
}
});
final boolean await = msgProcessingLatch.await(240, TimeUnit.SECONDS);
Assert.assertTrue(await);
if (await) {
logger.info("run after message consumption...");
AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
AnalysisRequestTracker storeTracker = store.query(tracker.getRequest().getId());
Assert.assertEquals(tracker.getRequest().getId(), storeTracker.getRequest().getId());
Assert.assertEquals(STATUS.IN_DISCOVERY_SERVICE_QUEUE, storeTracker.getStatus());
//start odf and cleanup here...
logger.info("shutdown all threads and restart ODF");
tm.shutdownAllUnmanagedThreads();
int threadKillRetry = 0;
while (tm.getNumberOfRunningThreads() > 0 && threadKillRetry < 20) {
Thread.sleep(500);
threadKillRetry++;
}
logger.info("All threads down, restart ODF " + threadKillRetry);
// Initialize analysis manager
new ODFFactory().create().getAnalysisManager();
kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
int healthRetrieveRetry = 0;
//wait for max of 40 secs for status consumer to come up. If it is, we can continue because ODF is restarted successfully
while (origQueueConsumers.isEmpty() && healthRetrieveRetry < 240) {
healthRetrieveRetry++;
Thread.sleep(500);
origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
}
Assert.assertNotEquals(healthRetrieveRetry, 240);
logger.info("initialized, wait for cleanup ... " + healthRetrieveRetry);
Thread.sleep(5000);
logger.info("Found health consumers: " + origQueueConsumers.toString());
logger.info("hopefully cleaned up ...");
AnalysisRequestTracker storedTracker = store.query(tracker.getRequest().getId());
Assert.assertEquals(STATUS.ERROR, storedTracker.getStatus());
logger.info("DONE CLEANING UP, ALL FINE");
}
Assert.assertEquals(0, multiThreadErrors.size());
}
public Properties getKafkaConsumerConfigProperties(String consumerGroupID) {
SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties();
kafkaConsumerProps.put("group.id", consumerGroupID);
if (zookeeperConnectString != null) {
kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString);
}
kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
StringBuilder bld = new StringBuilder();
final Iterator<String> iterator = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString).iterator();
while (iterator.hasNext()) {
bld.append(iterator.next());
if (iterator.hasNext()) {
bld.append(",");
}
}
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return kafkaConsumerProps;
}
}