blob: 5d25f36ca316f7600865cdc31d1a5bb67e3a76fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.vm.schedule;
import com.cloud.api.ApiGsonHelper;
import com.cloud.event.ActionEventUtils;
import com.cloud.event.EventTypes;
import com.cloud.user.User;
import com.cloud.utils.DateUtil;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.db.GlobalLock;
import com.cloud.vm.UserVmManager;
import com.cloud.vm.VirtualMachine;
import com.google.common.primitives.Longs;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.command.user.vm.RebootVMCmd;
import org.apache.cloudstack.api.command.user.vm.StartVMCmd;
import org.apache.cloudstack.api.command.user.vm.StopVMCmd;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.vm.schedule.dao.VMScheduleDao;
import org.apache.cloudstack.vm.schedule.dao.VMScheduledJobDao;
import org.apache.commons.lang.time.DateUtils;
import org.apache.log4j.Logger;
import org.springframework.scheduling.support.CronExpression;
import javax.inject.Inject;
import javax.persistence.EntityExistsException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
public class VMSchedulerImpl extends ManagerBase implements VMScheduler, Configurable {
private static Logger LOGGER = Logger.getLogger(VMSchedulerImpl.class);
@Inject
private VMScheduledJobDao vmScheduledJobDao;
@Inject
private VMScheduleDao vmScheduleDao;
@Inject
private UserVmManager userVmManager;
@Inject
private AsyncJobManager asyncJobManager;
private AsyncJobDispatcher asyncJobDispatcher;
private Timer vmSchedulerTimer;
private Date currentTimestamp;
private EnumMap<VMSchedule.Action, String> actionEventMap = new EnumMap<>(VMSchedule.Action.class);
public AsyncJobDispatcher getAsyncJobDispatcher() {
return asyncJobDispatcher;
}
public void setAsyncJobDispatcher(final AsyncJobDispatcher dispatcher) {
asyncJobDispatcher = dispatcher;
}
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[]{VMScheduledJobExpireInterval};
}
@Override
public String getConfigComponentName() {
return VMScheduler.class.getSimpleName();
}
@Override
public void removeScheduledJobs(List<Long> vmScheduleIds) {
if (vmScheduleIds == null || vmScheduleIds.isEmpty()) {
LOGGER.debug("Removed 0 scheduled jobs");
return;
}
Date now = new Date();
int rowsRemoved = vmScheduledJobDao.expungeJobsForSchedules(vmScheduleIds, now);
LOGGER.debug(String.format("Removed %s VM scheduled jobs", rowsRemoved));
}
@Override
public void updateScheduledJob(VMScheduleVO vmSchedule) {
removeScheduledJobs(Longs.asList(vmSchedule.getId()));
scheduleNextJob(vmSchedule, new Date());
}
@Override
public Date scheduleNextJob(VMScheduleVO vmSchedule, Date timestamp) {
if (!vmSchedule.getEnabled()) {
LOGGER.debug(String.format("VM Schedule [id=%s] for VM [id=%s] is disabled. Not scheduling next job.", vmSchedule.getUuid(), vmSchedule.getVmId()));
return null;
}
CronExpression cron = DateUtil.parseSchedule(vmSchedule.getSchedule());
Date startDate = vmSchedule.getStartDate();
Date endDate = vmSchedule.getEndDate();
VirtualMachine vm = userVmManager.getUserVm(vmSchedule.getVmId());
if (vm == null) {
LOGGER.info(String.format("VM [id=%s] is removed. Disabling VM schedule [id=%s].", vmSchedule.getVmId(), vmSchedule.getUuid()));
vmSchedule.setEnabled(false);
vmScheduleDao.persist(vmSchedule);
return null;
}
ZonedDateTime now;
if (timestamp != null) {
now = ZonedDateTime.ofInstant(timestamp.toInstant(), vmSchedule.getTimeZoneId());
} else {
now = ZonedDateTime.now(vmSchedule.getTimeZoneId());
}
ZonedDateTime zonedStartDate = ZonedDateTime.ofInstant(startDate.toInstant(), vmSchedule.getTimeZoneId());
ZonedDateTime zonedEndDate = null;
if (endDate != null) {
zonedEndDate = ZonedDateTime.ofInstant(endDate.toInstant(), vmSchedule.getTimeZoneId());
}
if (zonedEndDate != null && now.isAfter(zonedEndDate)) {
LOGGER.info(String.format("End time is less than current time. Disabling VM schedule [id=%s] for VM [id=%s].", vmSchedule.getUuid(), vmSchedule.getVmId()));
vmSchedule.setEnabled(false);
vmScheduleDao.persist(vmSchedule);
return null;
}
ZonedDateTime ts = null;
if (zonedStartDate.isAfter(now)) {
ts = cron.next(zonedStartDate);
} else {
ts = cron.next(now);
}
if (ts == null) {
LOGGER.info(String.format("No next schedule found. Disabling VM schedule [id=%s] for VM [id=%s].", vmSchedule.getUuid(), vmSchedule.getVmId()));
vmSchedule.setEnabled(false);
vmScheduleDao.persist(vmSchedule);
return null;
}
Date scheduledDateTime = Date.from(ts.toInstant());
VMScheduledJobVO scheduledJob = new VMScheduledJobVO(vmSchedule.getVmId(), vmSchedule.getId(), vmSchedule.getAction(), scheduledDateTime);
try {
vmScheduledJobDao.persist(scheduledJob);
ActionEventUtils.onScheduledActionEvent(User.UID_SYSTEM, vm.getAccountId(), actionEventMap.get(vmSchedule.getAction()),
String.format("Scheduled action (%s) [vmId: %s scheduleId: %s] at %s", vmSchedule.getAction(), vm.getUuid(), vmSchedule.getUuid(), scheduledDateTime),
vm.getId(), ApiCommandResourceType.VirtualMachine.toString(), true, 0);
} catch (EntityExistsException exception) {
LOGGER.debug("Job is already scheduled.");
}
return scheduledDateTime;
}
@Override
public boolean start() {
actionEventMap.put(VMSchedule.Action.START, EventTypes.EVENT_VM_SCHEDULE_START);
actionEventMap.put(VMSchedule.Action.STOP, EventTypes.EVENT_VM_SCHEDULE_STOP);
actionEventMap.put(VMSchedule.Action.REBOOT, EventTypes.EVENT_VM_SCHEDULE_REBOOT);
actionEventMap.put(VMSchedule.Action.FORCE_STOP, EventTypes.EVENT_VM_SCHEDULE_FORCE_STOP);
actionEventMap.put(VMSchedule.Action.FORCE_REBOOT, EventTypes.EVENT_VM_SCHEDULE_FORCE_REBOOT);
// Adding 1 minute to currentTimestamp to ensure that
// jobs which were to be run at current time, doesn't cause issues
currentTimestamp = DateUtils.addMinutes(new Date(), 1);
scheduleNextJobs(currentTimestamp);
final TimerTask schedulerPollTask = new ManagedContextTimerTask() {
@Override
protected void runInContext() {
try {
poll(new Date());
} catch (final Throwable t) {
LOGGER.warn("Catch throwable in VM scheduler ", t);
}
}
};
vmSchedulerTimer = new Timer("VMSchedulerPollTask");
vmSchedulerTimer.scheduleAtFixedRate(schedulerPollTask, 5000L, 60 * 1000L);
return true;
}
@Override
public void poll(Date timestamp) {
currentTimestamp = DateUtils.round(timestamp, Calendar.MINUTE);
String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, currentTimestamp);
LOGGER.debug(String.format("VM scheduler.poll is being called at %s", displayTime));
GlobalLock scanLock = GlobalLock.getInternLock("vmScheduler.poll");
try {
if (scanLock.lock(30)) {
try {
scheduleNextJobs(currentTimestamp);
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
scanLock = GlobalLock.getInternLock("vmScheduler.poll");
try {
if (scanLock.lock(30)) {
try {
startJobs(); // Create async job and update scheduled job
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
try {
cleanupVMScheduledJobs();
} catch (Exception e) {
LOGGER.warn("Error in cleaning up vm scheduled jobs", e);
}
}
private void scheduleNextJobs(Date timestamp) {
for (final VMScheduleVO schedule : vmScheduleDao.listAllActiveSchedules()) {
try {
scheduleNextJob(schedule, timestamp);
} catch (Exception e) {
LOGGER.warn("Error in scheduling next job for schedule " + schedule.getUuid(), e);
}
}
}
/**
* Delete scheduled jobs before vm.scheduler.expire.interval days
*/
private void cleanupVMScheduledJobs() {
Date deleteBeforeDate = DateUtils.addDays(currentTimestamp, -1 * VMScheduledJobExpireInterval.value());
int rowsRemoved = vmScheduledJobDao.expungeJobsBefore(deleteBeforeDate);
LOGGER.info(String.format("Cleaned up %d VM scheduled job entries", rowsRemoved));
}
void executeJobs(Map<Long, VMScheduledJob> jobsToExecute) {
String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, currentTimestamp);
for (Map.Entry<Long, VMScheduledJob> entry : jobsToExecute.entrySet()) {
VMScheduledJob vmScheduledJob = entry.getValue();
VirtualMachine vm = userVmManager.getUserVm(vmScheduledJob.getVmId());
VMScheduledJobVO tmpVMScheduleJob = null;
try {
if (LOGGER.isDebugEnabled()) {
final Date scheduledTimestamp = vmScheduledJob.getScheduledTime();
displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, scheduledTimestamp);
LOGGER.debug(String.format("Executing %s for VM id %d for schedule id: %d at %s", vmScheduledJob.getAction(), vmScheduledJob.getVmId(), vmScheduledJob.getVmScheduleId(), displayTime));
}
tmpVMScheduleJob = vmScheduledJobDao.acquireInLockTable(vmScheduledJob.getId());
Long jobId = processJob(vmScheduledJob, vm);
if (jobId != null) {
tmpVMScheduleJob.setAsyncJobId(jobId);
vmScheduledJobDao.update(vmScheduledJob.getId(), tmpVMScheduleJob);
}
} catch (final Exception e) {
LOGGER.warn(String.format("Executing scheduled job id: %s failed due to %s", vmScheduledJob.getId(), e));
} finally {
if (tmpVMScheduleJob != null) {
vmScheduledJobDao.releaseFromLockTable(vmScheduledJob.getId());
}
}
}
}
Long processJob(VMScheduledJob vmScheduledJob, VirtualMachine vm) {
if (!Arrays.asList(VirtualMachine.State.Running, VirtualMachine.State.Stopped).contains(vm.getState())) {
LOGGER.info(String.format("Skipping action (%s) for [vmId:%s scheduleId: %s] because VM is invalid state: %s", vmScheduledJob.getAction(), vm.getUuid(), vmScheduledJob.getVmScheduleId(), vm.getState()));
return null;
}
final Long eventId = ActionEventUtils.onCompletedActionEvent(User.UID_SYSTEM, vm.getAccountId(), null,
actionEventMap.get(vmScheduledJob.getAction()), true,
String.format("Executing action (%s) for VM Id:%s", vmScheduledJob.getAction(), vm.getUuid()),
vm.getId(), ApiCommandResourceType.VirtualMachine.toString(), 0);
if (vm.getState() == VirtualMachine.State.Running) {
switch (vmScheduledJob.getAction()) {
case STOP:
return executeStopVMJob(vm, false, eventId);
case FORCE_STOP:
return executeStopVMJob(vm, true, eventId);
case REBOOT:
return executeRebootVMJob(vm, false, eventId);
case FORCE_REBOOT:
return executeRebootVMJob(vm, true, eventId);
}
} else if (vm.getState() == VirtualMachine.State.Stopped && vmScheduledJob.getAction() == VMSchedule.Action.START) {
return executeStartVMJob(vm, eventId);
}
LOGGER.warn(String.format("Skipping action (%s) for [vmId:%s scheduleId: %s] because VM is in state: %s",
vmScheduledJob.getAction(), vm.getUuid(), vmScheduledJob.getVmScheduleId(), vm.getState()));
return null;
}
private void skipJobs(Map<Long, VMScheduledJob> jobsToExecute, Map<Long, List<VMScheduledJob>> jobsNotToExecute) {
for (Map.Entry<Long, List<VMScheduledJob>> entry : jobsNotToExecute.entrySet()) {
Long vmId = entry.getKey();
List<VMScheduledJob> skippedVmScheduledJobVOS = entry.getValue();
VirtualMachine vm = userVmManager.getUserVm(vmId);
for (final VMScheduledJob skippedVmScheduledJobVO : skippedVmScheduledJobVOS) {
VMScheduledJob scheduledJob = jobsToExecute.get(vmId);
LOGGER.info(String.format("Skipping scheduled job [id: %s, vmId: %s] because of conflict with another scheduled job [id: %s]", skippedVmScheduledJobVO.getUuid(), vm.getUuid(), scheduledJob.getUuid()));
}
}
}
/**
* Create async jobs for VM scheduled jobs
*/
private void startJobs() {
String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, currentTimestamp);
final List<VMScheduledJobVO> vmScheduledJobs = vmScheduledJobDao.listJobsToStart(currentTimestamp);
LOGGER.debug(String.format("Got %d scheduled jobs to be executed at %s", vmScheduledJobs.size(), displayTime));
Map<Long, VMScheduledJob> jobsToExecute = new HashMap<>();
Map<Long, List<VMScheduledJob>> jobsNotToExecute = new HashMap<>();
for (final VMScheduledJobVO vmScheduledJobVO : vmScheduledJobs) {
long vmId = vmScheduledJobVO.getVmId();
if (jobsToExecute.get(vmId) == null) {
jobsToExecute.put(vmId, vmScheduledJobVO);
} else {
jobsNotToExecute.computeIfAbsent(vmId, k -> new ArrayList<>()).add(vmScheduledJobVO);
}
}
executeJobs(jobsToExecute);
skipJobs(jobsToExecute, jobsNotToExecute);
}
long executeStartVMJob(VirtualMachine vm, long eventId) {
final Map<String, String> params = new HashMap<>();
params.put(ApiConstants.ID, String.valueOf(vm.getId()));
params.put("ctxUserId", "1");
params.put("ctxAccountId", String.valueOf(vm.getAccountId()));
params.put(ApiConstants.CTX_START_EVENT_ID, String.valueOf(eventId));
final StartVMCmd cmd = new StartVMCmd();
ComponentContext.inject(cmd);
AsyncJobVO job = new AsyncJobVO("", User.UID_SYSTEM, vm.getAccountId(), StartVMCmd.class.getName(), ApiGsonHelper.getBuilder().create().toJson(params), vm.getId(), cmd.getApiResourceType() != null ? cmd.getApiResourceType().toString() : null, null);
job.setDispatcher(asyncJobDispatcher.getName());
return asyncJobManager.submitAsyncJob(job);
}
long executeStopVMJob(VirtualMachine vm, boolean isForced, long eventId) {
final Map<String, String> params = new HashMap<>();
params.put(ApiConstants.ID, String.valueOf(vm.getId()));
params.put("ctxUserId", "1");
params.put("ctxAccountId", String.valueOf(vm.getAccountId()));
params.put(ApiConstants.CTX_START_EVENT_ID, String.valueOf(eventId));
params.put(ApiConstants.FORCED, String.valueOf(isForced));
final StopVMCmd cmd = new StopVMCmd();
ComponentContext.inject(cmd);
AsyncJobVO job = new AsyncJobVO("", User.UID_SYSTEM, vm.getAccountId(), StopVMCmd.class.getName(), ApiGsonHelper.getBuilder().create().toJson(params), vm.getId(), cmd.getApiResourceType() != null ? cmd.getApiResourceType().toString() : null, null);
job.setDispatcher(asyncJobDispatcher.getName());
return asyncJobManager.submitAsyncJob(job);
}
long executeRebootVMJob(VirtualMachine vm, boolean isForced, long eventId) {
final Map<String, String> params = new HashMap<>();
params.put(ApiConstants.ID, String.valueOf(vm.getId()));
params.put("ctxUserId", "1");
params.put("ctxAccountId", String.valueOf(vm.getAccountId()));
params.put(ApiConstants.CTX_START_EVENT_ID, String.valueOf(eventId));
params.put(ApiConstants.FORCED, String.valueOf(isForced));
final RebootVMCmd cmd = new RebootVMCmd();
ComponentContext.inject(cmd);
AsyncJobVO job = new AsyncJobVO("", User.UID_SYSTEM, vm.getAccountId(), RebootVMCmd.class.getName(), ApiGsonHelper.getBuilder().create().toJson(params), vm.getId(), cmd.getApiResourceType() != null ? cmd.getApiResourceType().toString() : null, null);
job.setDispatcher(asyncJobDispatcher.getName());
return asyncJobManager.submitAsyncJob(job);
}
}