blob: d4fe08397f9e17453a3925b94ac024b499463222 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.storage.snapshot;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.ApiGsonHelper;
import com.cloud.event.ActionEventUtils;
import com.cloud.event.EventTypes;
import com.cloud.server.ResourceTag;
import com.cloud.server.TaggedResourceService;
import com.cloud.storage.Snapshot;
import com.cloud.storage.SnapshotPolicyVO;
import com.cloud.storage.SnapshotScheduleVO;
import com.cloud.storage.SnapshotVO;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.SnapshotDao;
import com.cloud.storage.dao.SnapshotPolicyDao;
import com.cloud.storage.dao.SnapshotScheduleDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.user.Account;
import com.cloud.user.User;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.DateUtil.IntervalType;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.TestClock;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.vm.snapshot.VMSnapshotManager;
import com.cloud.vm.snapshot.VMSnapshotVO;
import com.cloud.vm.snapshot.dao.VMSnapshotDao;
@Component
public class SnapshotSchedulerImpl extends ManagerBase implements SnapshotScheduler {
private static final Logger s_logger = Logger.getLogger(SnapshotSchedulerImpl.class);
@Inject
protected AsyncJobDao _asyncJobDao;
@Inject
protected SnapshotDao _snapshotDao;
@Inject
protected SnapshotScheduleDao _snapshotScheduleDao;
@Inject
protected SnapshotPolicyDao _snapshotPolicyDao;
@Inject
protected AsyncJobManager _asyncMgr;
@Inject
protected VolumeDao _volsDao;
@Inject
protected ConfigurationDao _configDao;
@Inject
protected ApiDispatcher _dispatcher;
@Inject
protected AccountDao _acctDao;
@Inject
protected SnapshotApiService _snapshotService;
@Inject
protected VMSnapshotDao _vmSnapshotDao;
@Inject
protected VMSnapshotManager _vmSnaphostManager;
@Inject
public TaggedResourceService taggedResourceService;
protected AsyncJobDispatcher _asyncDispatcher;
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
private int _snapshotPollInterval;
private Timer _testClockTimer;
private Date _currentTimestamp;
private TestClock _testTimerTask;
public AsyncJobDispatcher getAsyncJobDispatcher() {
return _asyncDispatcher;
}
public void setAsyncJobDispatcher(final AsyncJobDispatcher dispatcher) {
_asyncDispatcher = dispatcher;
}
private Date getNextScheduledTime(final long policyId, final Date currentTimestamp) {
final SnapshotPolicyVO policy = _snapshotPolicyDao.findById(policyId);
Date nextTimestamp = null;
if (policy != null) {
final short intervalType = policy.getInterval();
final IntervalType type = DateUtil.getIntervalType(intervalType);
final String schedule = policy.getSchedule();
final String timezone = policy.getTimezone();
nextTimestamp = DateUtil.getNextRunTime(type, schedule, timezone, currentTimestamp);
final String currentTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, currentTimestamp);
final String nextScheduledTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, nextTimestamp);
s_logger.debug("Current time is " + currentTime + ". NextScheduledTime of policyId " + policyId + " is " + nextScheduledTime);
}
return nextTimestamp;
}
/**
* {@inheritDoc}
*/
@Override
public void poll(final Date currentTimestamp) {
// We don't maintain the time. The timer task does.
_currentTimestamp = currentTimestamp;
GlobalLock scanLock = GlobalLock.getInternLock("snapshot.poll");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
checkStatusOfCurrentlyExecutingSnapshots();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
scanLock = GlobalLock.getInternLock("snapshot.poll");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
scheduleSnapshots();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
try {
deleteExpiredVMSnapshots();
}
catch (Exception e) {
s_logger.warn("Error in expiring vm snapshots", e);
}
}
private void checkStatusOfCurrentlyExecutingSnapshots() {
final SearchCriteria<SnapshotScheduleVO> sc = _snapshotScheduleDao.createSearchCriteria();
sc.addAnd("asyncJobId", SearchCriteria.Op.NNULL);
final List<SnapshotScheduleVO> snapshotSchedules = _snapshotScheduleDao.search(sc, null);
for (final SnapshotScheduleVO snapshotSchedule : snapshotSchedules) {
final Long asyncJobId = snapshotSchedule.getAsyncJobId();
final AsyncJobVO asyncJob = _asyncJobDao.findByIdIncludingRemoved(asyncJobId);
switch (asyncJob.getStatus()) {
case SUCCEEDED:
// The snapshot has been successfully backed up.
// The snapshot state has also been cleaned up.
// We can schedule the next job for this snapshot.
// Remove the existing entry in the snapshot_schedule table.
scheduleNextSnapshotJob(snapshotSchedule);
break;
case FAILED:
// Check the snapshot status.
final Long snapshotId = snapshotSchedule.getSnapshotId();
if (snapshotId == null) {
// createSnapshotAsync exited, successfully or unsuccessfully,
// even before creating a snapshot record
// No cleanup needs to be done.
// Schedule the next snapshot.
scheduleNextSnapshotJob(snapshotSchedule);
} else {
final SnapshotVO snapshot = _snapshotDao.findById(snapshotId);
if (snapshot == null || snapshot.getRemoved() != null) {
// This snapshot has been deleted successfully from the primary storage
// Again no cleanup needs to be done.
// Schedule the next snapshot.
// There's very little probability that the code reaches this point.
// The snapshotId is a foreign key for the snapshot_schedule table
// set to ON DELETE CASCADE. So if the snapshot entry is deleted, the snapshot_schedule entry will be too.
// But what if it has only been marked as removed?
scheduleNextSnapshotJob(snapshotSchedule);
} else {
// The management server executing this snapshot job appears to have crashed
// while creating the snapshot on primary storage/or backing it up.
// We have no idea whether the snapshot was successfully taken on the primary or not.
// Schedule the next snapshot job.
// The ValidatePreviousSnapshotCommand will take appropriate action on this snapshot
// If the snapshot was taken successfully on primary, it will retry backing it up.
// and cleanup the previous snapshot
// Set the userId to that of system.
//_snapshotManager.validateSnapshot(1L, snapshot);
// In all cases, schedule the next snapshot job
scheduleNextSnapshotJob(snapshotSchedule);
}
}
break;
case IN_PROGRESS:
// There is no way of knowing from here whether
// 1) Another management server is processing this snapshot job
// 2) The management server has crashed and this snapshot is lying
// around in an inconsistent state.
// Hopefully, this can be resolved at the backend when the current snapshot gets executed.
// But if it remains in this state, the current snapshot will not get executed.
// And it will remain in stasis.
break;
}
}
}
@DB
protected void deleteExpiredVMSnapshots() {
Date now = new Date();
List<VMSnapshotVO> vmSnapshots = _vmSnapshotDao.listAll();
for (VMSnapshotVO vmSnapshot : vmSnapshots) {
long accountId = vmSnapshot.getAccountId();
int expiration_interval_hours = VMSnapshotManager.VMSnapshotExpireInterval.valueIn(accountId);
if (expiration_interval_hours < 0 ) {
continue;
}
Date creationTime = vmSnapshot.getCreated();
long diffInHours = TimeUnit.MILLISECONDS.toHours(now.getTime() - creationTime.getTime());
if (diffInHours >= expiration_interval_hours) {
if (s_logger.isDebugEnabled()){
s_logger.debug("Deleting expired VM snapshot id: " + vmSnapshot.getId());
}
_vmSnaphostManager.deleteVMSnapshot(vmSnapshot.getId());
}
}
}
@DB
protected void scheduleSnapshots() {
String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, _currentTimestamp);
s_logger.debug("Snapshot scheduler.poll is being called at " + displayTime);
final List<SnapshotScheduleVO> snapshotsToBeExecuted = _snapshotScheduleDao.getSchedulesToExecute(_currentTimestamp);
s_logger.debug("Got " + snapshotsToBeExecuted.size() + " snapshots to be executed at " + displayTime);
for (final SnapshotScheduleVO snapshotToBeExecuted : snapshotsToBeExecuted) {
SnapshotScheduleVO tmpSnapshotScheduleVO = null;
final long snapshotScheId = snapshotToBeExecuted.getId();
final long policyId = snapshotToBeExecuted.getPolicyId();
final long volumeId = snapshotToBeExecuted.getVolumeId();
try {
final VolumeVO volume = _volsDao.findById(volumeId);
if (volume.getPoolId() == null) {
// this volume is not attached
continue;
}
Account volAcct = _acctDao.findById(volume.getAccountId());
if (volAcct == null || volAcct.getState() == Account.State.DISABLED) {
// this account has been removed, so don't trigger recurring snapshot
if (s_logger.isDebugEnabled()) {
s_logger.debug("Skip snapshot for volume " + volume.getUuid() + " since its account has been removed or disabled");
}
continue;
}
if (_snapshotPolicyDao.findById(policyId) == null) {
_snapshotScheduleDao.remove(snapshotToBeExecuted.getId());
}
if (s_logger.isDebugEnabled()) {
final Date scheduledTimestamp = snapshotToBeExecuted.getScheduledTimestamp();
displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, scheduledTimestamp);
s_logger.debug("Scheduling 1 snapshot for volume id " + volumeId + " (volume name:" +
volume.getName() + ") for schedule id: " + snapshotToBeExecuted.getId() + " at " + displayTime);
}
tmpSnapshotScheduleVO = _snapshotScheduleDao.acquireInLockTable(snapshotScheId);
final Long eventId =
ActionEventUtils.onScheduledActionEvent(User.UID_SYSTEM, volume.getAccountId(), EventTypes.EVENT_SNAPSHOT_CREATE, "creating snapshot for volume Id:" +
volume.getUuid(), volumeId, ApiCommandResourceType.Volume.toString(), true, 0);
final Map<String, String> params = new HashMap<String, String>();
params.put(ApiConstants.VOLUME_ID, "" + volumeId);
params.put(ApiConstants.POLICY_ID, "" + policyId);
params.put("ctxUserId", "1");
params.put("ctxAccountId", "" + volume.getAccountId());
params.put("ctxStartEventId", String.valueOf(eventId));
List<? extends ResourceTag> resourceTags = taggedResourceService.listByResourceTypeAndId(ResourceTag.ResourceObjectType.SnapshotPolicy, policyId);
if (resourceTags != null && !resourceTags.isEmpty()) {
int tagNumber = 0;
for (ResourceTag resourceTag : resourceTags) {
params.put("tags[" + tagNumber + "].key", resourceTag.getKey());
params.put("tags[" + tagNumber + "].value", resourceTag.getValue());
tagNumber++;
}
}
final CreateSnapshotCmd cmd = new CreateSnapshotCmd();
ComponentContext.inject(cmd);
_dispatcher.dispatchCreateCmd(cmd, params);
params.put("id", "" + cmd.getEntityId());
params.put("ctxStartEventId", "1");
AsyncJobVO job = new AsyncJobVO("", User.UID_SYSTEM, volume.getAccountId(), CreateSnapshotCmd.class.getName(),
ApiGsonHelper.getBuilder().create().toJson(params), cmd.getEntityId(),
cmd.getApiResourceType() != null ? cmd.getApiResourceType().toString() : null, null);
job.setDispatcher(_asyncDispatcher.getName());
final long jobId = _asyncMgr.submitAsyncJob(job);
tmpSnapshotScheduleVO.setAsyncJobId(jobId);
_snapshotScheduleDao.update(snapshotScheId, tmpSnapshotScheduleVO);
} catch (final Exception e) {
// TODO Logging this exception is enough?
s_logger.warn("Scheduling snapshot failed due to " + e.toString());
} finally {
if (tmpSnapshotScheduleVO != null) {
_snapshotScheduleDao.releaseFromLockTable(snapshotScheId);
}
}
}
}
private Date scheduleNextSnapshotJob(final SnapshotScheduleVO snapshotSchedule) {
if (snapshotSchedule == null) {
return null;
}
final Long policyId = snapshotSchedule.getPolicyId();
if (policyId.longValue() == Snapshot.MANUAL_POLICY_ID) {
// Don't need to schedule the next job for this.
return null;
}
final SnapshotPolicyVO snapshotPolicy = _snapshotPolicyDao.findById(policyId);
if (snapshotPolicy == null) {
_snapshotScheduleDao.expunge(snapshotSchedule.getId());
}
return scheduleNextSnapshotJob(snapshotPolicy);
}
@Override
@DB
public Date scheduleNextSnapshotJob(final SnapshotPolicyVO policy) {
if (policy == null) {
return null;
}
// If display attribute is false then remove schedules if any and return.
if(!policy.isDisplay()){
removeSchedule(policy.getVolumeId(), policy.getId());
return null;
}
final long policyId = policy.getId();
if (policyId == Snapshot.MANUAL_POLICY_ID) {
return null;
}
if (_volsDao.findById(policy.getVolumeId()) == null) {
s_logger.warn("Found snapshot policy ID: " + policyId + " for volume ID: " + policy.getVolumeId() + " that does not exist or has been removed");
removeSchedule(policy.getVolumeId(), policy.getId());
return null;
}
final Date nextSnapshotTimestamp = getNextScheduledTime(policyId, _currentTimestamp);
SnapshotScheduleVO spstSchedVO = _snapshotScheduleDao.findOneByVolumePolicy(policy.getVolumeId(), policy.getId());
if (spstSchedVO == null) {
spstSchedVO = new SnapshotScheduleVO(policy.getVolumeId(), policyId, nextSnapshotTimestamp);
_snapshotScheduleDao.persist(spstSchedVO);
} else {
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
try {
spstSchedVO = _snapshotScheduleDao.acquireInLockTable(spstSchedVO.getId());
spstSchedVO.setPolicyId(policyId);
spstSchedVO.setScheduledTimestamp(nextSnapshotTimestamp);
spstSchedVO.setAsyncJobId(null);
spstSchedVO.setSnapshotId(null);
_snapshotScheduleDao.update(spstSchedVO.getId(), spstSchedVO);
txn.commit();
} finally {
if (spstSchedVO != null) {
_snapshotScheduleDao.releaseFromLockTable(spstSchedVO.getId());
}
txn.close();
}
}
return nextSnapshotTimestamp;
}
@Override
public void scheduleOrCancelNextSnapshotJobOnDisplayChange(final SnapshotPolicyVO policy, boolean previousDisplay) {
// Take action only if display changed
if(policy.isDisplay() != previousDisplay ){
if(policy.isDisplay()){
scheduleNextSnapshotJob(policy);
}else{
removeSchedule(policy.getVolumeId(), policy.getId());
}
}
}
@Override
@DB
public boolean removeSchedule(final Long volumeId, final Long policyId) {
// We can only remove schedules which are in the future. Not which are already executed in the past.
final SnapshotScheduleVO schedule = _snapshotScheduleDao.getCurrentSchedule(volumeId, policyId, false);
boolean success = true;
if (schedule != null) {
success = _snapshotScheduleDao.remove(schedule.getId());
}
if (!success) {
s_logger.debug("Error while deleting Snapshot schedule with Id: " + schedule.getId());
}
return success;
}
@Override
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
_snapshotPollInterval = NumbersUtil.parseInt(_configDao.getValue("snapshot.poll.interval"), 300);
final boolean snapshotsRecurringTest = Boolean.parseBoolean(_configDao.getValue("snapshot.recurring.test"));
if (snapshotsRecurringTest) {
// look for some test values in the configuration table so that snapshots can be taken more frequently (QA test code)
final int minutesPerHour = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.minutes.per.hour"), 60);
final int hoursPerDay = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.hours.per.day"), 24);
final int daysPerWeek = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.days.per.week"), 7);
final int daysPerMonth = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.days.per.month"), 30);
final int weeksPerMonth = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.weeks.per.month"), 4);
final int monthsPerYear = NumbersUtil.parseInt(_configDao.getValue("snapshot.test.months.per.year"), 12);
_testTimerTask = new TestClock(this, minutesPerHour, hoursPerDay, daysPerWeek, daysPerMonth, weeksPerMonth, monthsPerYear);
}
_currentTimestamp = new Date();
s_logger.info("Snapshot Scheduler is configured.");
return true;
}
@Override
@DB
public boolean start() {
// reschedule all policies after management restart
final List<SnapshotPolicyVO> policyInstances = _snapshotPolicyDao.listAll();
for (final SnapshotPolicyVO policyInstance : policyInstances) {
if (policyInstance.getId() != Snapshot.MANUAL_POLICY_ID) {
scheduleNextSnapshotJob(policyInstance);
}
}
if (_testTimerTask != null) {
_testClockTimer = new Timer("TestClock");
// Run the test clock every 60s. Because every tick is counted as 1 minute.
// Else it becomes too confusing.
_testClockTimer.schedule(_testTimerTask, 100 * 1000L, 60 * 1000L);
} else {
final TimerTask timerTask = new ManagedContextTimerTask() {
@Override
protected void runInContext() {
try {
final Date currentTimestamp = new Date();
poll(currentTimestamp);
} catch (final Throwable t) {
s_logger.warn("Catch throwable in snapshot scheduler ", t);
}
}
};
_testClockTimer = new Timer("SnapshotPollTask");
_testClockTimer.schedule(timerTask, _snapshotPollInterval * 1000L, _snapshotPollInterval * 1000L);
}
return true;
}
@Override
public boolean stop() {
return true;
}
}