blob: 73d98e79af98ea5ed294a396d875991bb1324b8d [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.odf.core.messaging.kafka;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
/**
* This consumer is started for a certain runtime and starts a KafkaQueueConsumer if
* the runtime is available.
*
*
*/
public class KafkaRuntimeConsumer implements ODFRunnable {
Logger logger = Logger.getLogger(KafkaRuntimeConsumer.class.getName());
private ServiceRuntime runtime;
private boolean isShutdown = false;
private ExecutorService executorService = null;
private KafkaQueueConsumer kafkaQueueConsumer = null;
private String topic;
private Properties config;
private QueueMessageProcessor processor;
private KafkaQueueConsumer.ConsumptionCallback callback = new KafkaQueueConsumer.ConsumptionCallback() {
@Override
public boolean stopConsumption() {
return isShutdown || (runtime.getWaitTimeUntilAvailable() > 0);
}
};
public KafkaRuntimeConsumer(ServiceRuntime runtime, String topicName, Properties config, QueueMessageProcessor processor) {
this.runtime = runtime;
this.processor = processor;
this.topic = topicName;
this.config = config;
}
@Override
public void run() {
logger.log(Level.INFO, "Starting runtime consumer for topic ''{0}''", topic);
while (!isShutdown) {
long waitTime = runtime.getWaitTimeUntilAvailable();
if (waitTime <= 0) {
logger.log(Level.INFO, "Starting Kafka consumer for topic ''{0}''", topic);
kafkaQueueConsumer = new KafkaQueueConsumer(topic, config, processor, callback);
kafkaQueueConsumer.setExecutorService(executorService);
// run consumer synchronously
kafkaQueueConsumer.run();
logger.log(Level.INFO, "Kafka consumer for topic ''{0}'' is finished", topic);
// if we are here, this means that the consumer was cancelled
// either directly or (more likely) through the Consumption callback
kafkaQueueConsumer = null;
} else {
try {
logger.log(Level.FINER, "Runtime ''{0}'' is not available, waiting for ''{1}''ms", new Object[]{runtime.getName(), waitTime });
Thread.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
logger.log(Level.INFO, "Kafka runtime consumer for topic ''{0}'' has shut down", topic);
}
@Override
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void cancel() {
isShutdown = true;
if (kafkaQueueConsumer != null) {
kafkaQueueConsumer.cancel();
}
}
@Override
public boolean isReady() {
return true;
}
}