blob: 40721707ee2c9fcd1596227dd3988c72b238d26a [file] [log] [blame]
package org.apache.airavata.metascheduler.metadata.analyzer.impl;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.metascheduler.core.engine.DataAnalyzer;
import org.apache.airavata.metascheduler.core.utils.Utils;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.RegistryService.Client;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class DataAnalyzerImpl implements DataAnalyzer {
private static final Logger LOGGER = LoggerFactory.getLogger(DataAnalyzerImpl.class);
protected static ThriftClientPool<RegistryService.Client> registryClientPool = Utils.getRegistryServiceClientPool();
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
RegistryService.Client client = null;
try {
LOGGER.debug("Executing Data Analyzer ....... ");
client = this.registryClientPool.getResource();
//TODO: handle multiple gateways
String gateway = ServerSettings.getDataAnalyzingEnabledGateways();
JobState state = JobState.SUBMITTED;
JobStatus jobStatus = new JobStatus();
jobStatus.setJobState(state);
double time = ServerSettings.getDataAnalyzerTimeStep();
int fiveMinuteCount = client.getJobCount(jobStatus, gateway, 5);
int tenMinuteCount = client.getJobCount(jobStatus, gateway, 10);
int fifteenMinuteCount = client.getJobCount(jobStatus, gateway, 15);
double fiveMinuteAverage = fiveMinuteCount * time / (5 * 60);
double tenMinuteAverage = tenMinuteCount * time / (10 * 60);
double fifteenMinuteAverage = fifteenMinuteCount * time / (10 * 60);
LOGGER.info("service rate: 5 min avg " + fiveMinuteAverage + " 10 min avg "
+ tenMinuteAverage + " 15 min avg " + fifteenMinuteAverage);
Map<String, Double> timeDistribution = client.getAVGTimeDistribution(gateway,15);
String msg ="";
for(Map.Entry<String, Double> entry: timeDistribution.entrySet()){
msg = msg+ " avg time "+entry.getKey()+" : "+entry.getValue();
}
LOGGER.info(msg);
} catch (Exception ex) {
String msg = "Error occurred while executing data analyzer" + ex.getMessage();
LOGGER.error(msg, ex);
if (client != null) {
this.registryClientPool.returnBrokenResource(client);
}
client = null;
} finally {
if (client != null) {
this.registryClientPool.returnResource(client);
}
}
}
}