blob: 31efef72300a6eb38f6cacfc0d01411068f6a05e [file] [log] [blame]
package org.apache.airavata.compute.resource.monitoring.job;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.compute.resource.monitoring.job.output.OutputParser;
import org.apache.airavata.compute.resource.monitoring.job.output.OutputParserImpl;
import org.apache.airavata.compute.resource.monitoring.utils.Constants;
import org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.appcatalog.groupresourceprofile.ComputeResourcePolicy;
import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
import org.apache.airavata.model.status.QueueStatusModel;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.RegistryService.Client;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* This class is responsible to execute CR monitoring code
*/
public class MonitoringJob extends ComputeResourceMonitor implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(MonitoringJob.class);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
RegistryService.Client client = null;
try {
LOGGER.debug("Executing ComputeResources Monitoring Job....... ");
client = this.registryClientPool.getResource();
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
String metaSchedulerGateway = jobDataMap.getString(Constants.METASCHEDULER_GATEWAY);
String metaSchedulerGRP = jobDataMap.getString(Constants.METASCHEDULER_GRP_ID);
String username = jobDataMap.getString(Constants.METASCHEDULER_USERNAME);
int jobId = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOB_ID);
int parallelJobs = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOBS);
LOGGER.debug("Main Gateway:"+metaSchedulerGateway+" Group Resource Profile: "
+metaSchedulerGRP+" username: "+username+" jobId: "+jobId+" parallellJobs: "+parallelJobs);
executeComputeResourceMonitoring(client, metaSchedulerGateway, username, metaSchedulerGRP, parallelJobs, jobId);
} catch (Exception ex) {
String msg = "Error occurred while executing job" + ex.getMessage();
LOGGER.error(msg, ex);
if (client != null) {
registryClientPool.returnBrokenResource(client);
}
client = null;
} finally {
if (client != null) {
registryClientPool.returnResource(client);
}
}
}
private void executeComputeResourceMonitoring(RegistryService.Client client, String metaSchedulerGateway, String username,
String metaSchedulerGRP, int parallelJobs, int jobId) throws Exception {
AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
GroupResourceProfile groupResourceProfile = getGroupResourceProfile(metaSchedulerGRP);
// List<GroupComputeResourcePreference> computeResourcePreferenceList = groupResourceProfile.getComputePreferences();
int size = groupResourceProfile.getComputeResourcePoliciesSize();
int chunkSize = size / parallelJobs;
int startIndex = jobId * chunkSize;
int endIndex = (jobId + 1) * chunkSize;
if (jobId == parallelJobs - 1) {
endIndex = size;
}
List<ComputeResourcePolicy> computeResourcePolicyList = groupResourceProfile.getComputeResourcePolicies().
subList(startIndex, endIndex);
for (ComputeResourcePolicy computeResourcePolicy : computeResourcePolicyList) {
updateComputeResource(client, adaptorSupport, metaSchedulerGateway, username, computeResourcePolicy);
}
}
private void updateComputeResource(RegistryService.Client client, AdaptorSupport adaptorSupport,
String gatewayId,
String username,
ComputeResourcePolicy computeResourcePolicy) throws Exception {
String computeResourceId = computeResourcePolicy.getComputeResourceId();
ComputeResourceDescription comResourceDes = client.getComputeResource(computeResourceId);
List<JobSubmissionInterface> jobSubmissionInterfaces = comResourceDes.getJobSubmissionInterfaces();
Collections.sort(jobSubmissionInterfaces, Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
JobSubmissionInterface jobSubmissionInterface = jobSubmissionInterfaces.get(0);
JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
ResourceJobManager resourceJobManager = JobFactory.getResourceJobManager(client, jobSubmissionProtocol, jobSubmissionInterface);
//TODO: intial phase we are only supporting SLURM
if (resourceJobManager.getResourceJobManagerType().name().equals("SLURM")) {
String baseCommand = "sinfo";
if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_CLUSTER_INFO)) {
baseCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_CLUSTER_INFO);
}
List<String> allowedBatchQueues = computeResourcePolicy.getAllowedBatchQueues();
List<QueueStatusModel> queueStatusModels = new ArrayList<>();
for (String queue : allowedBatchQueues) {
String finalCommand = baseCommand + " -p " + queue;
String computeResourceToken = getComputeResourceCredentialToken(
gatewayId,
username,
computeResourceId,
false,
true,
computeResourcePolicy.getGroupResourceProfileId());
String loginUsername = getComputeResourceLoginUserName(gatewayId,
username,
computeResourceId,
false,
true,
computeResourcePolicy.getGroupResourceProfileId(),
null);
AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
computeResourceId,
jobSubmissionProtocol,
computeResourceToken,
loginUsername);
CommandOutput commandOutput = adaptor.executeCommand(finalCommand, null);
OutputParser outputParser = new OutputParserImpl();
boolean queueStatus = false;
int runningJobs = 0;
int pendingJobs = 0;
if (outputParser.isComputeResourceAvailable(commandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
queueStatus = true;
String runningJobCommand = "squeue";
String pendingJobCommand = "squeue";
if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS)) {
runningJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
}
if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS)) {
pendingJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
}
String runningJobsCommand = runningJobCommand + "-h -t running -r | wc -l";
String pendingJobsCommand = pendingJobCommand + "-h -t pending -r | wc -l";
CommandOutput runningJobsCommandOutput = adaptor.executeCommand(runningJobsCommand, null);
CommandOutput pendingJobsCommandOutput = adaptor.executeCommand(pendingJobsCommand, null);
runningJobs = outputParser.getNumberofJobs(runningJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
pendingJobs = outputParser.getNumberofJobs(pendingJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
}
QueueStatusModel queueStatusModel = new QueueStatusModel();
queueStatusModel.setHostName(comResourceDes.getHostName());
queueStatusModel.setQueueName(queue);
queueStatusModel.setQueueUp(queueStatus);
queueStatusModel.setRunningJobs(runningJobs);
queueStatusModel.setQueuedJobs(pendingJobs);
queueStatusModels.add(queueStatusModel);
queueStatusModel.setTime(System.currentTimeMillis());
}
client.registerQueueStatuses(queueStatusModels);
}
}
}