blob: 92a2acb9d4f3b36a94f4b69c7aed9bbe86c0f02f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.impl;
import static com.cloud.utils.HumanReadableJson.getHumanReadableBytesJson;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDetector;
import org.apache.cloudstack.framework.messagebus.PublishScope;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.jobs.JobInfo.Status;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.management.ManagementServerHost;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.network.Network;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.storage.Snapshot;
import com.cloud.storage.Volume;
import com.cloud.storage.VolumeDetailVO;
import com.cloud.storage.dao.SnapshotDao;
import com.cloud.storage.dao.SnapshotDetailsDao;
import com.cloud.storage.dao.SnapshotDetailsVO;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.storage.dao.VolumeDetailsDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.Predicate;
import com.cloud.utils.StringUtils;
import com.cloud.utils.component.ComponentLifecycle;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.DbProperties;
import com.cloud.utils.db.GenericDao;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.logging.log4j.ThreadContext;
public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener, Configurable {
// Advanced
public static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>("Advanced", Long.class, "job.expire.minutes", "1440",
"Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global);
public static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>("Advanced", Long.class, "job.cancel.threshold.minutes", "60",
"Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global);
private static final ConfigKey<Integer> VmJobLockTimeout = new ConfigKey<Integer>("Advanced",
Integer.class, "vm.job.lock.timeout", "1800",
"Time in seconds to wait in acquiring lock to submit a vm worker job", false);
private static final ConfigKey<Boolean> HidePassword = new ConfigKey<Boolean>("Advanced", Boolean.class, "log.hide.password", "true", "If set to true, the password is hidden", true, ConfigKey.Scope.Global);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
private static final int GC_INTERVAL = 10000; // 10 seconds
@Inject
private SyncQueueItemDao _queueItemDao;
@Inject
private SyncQueueManager _queueMgr;
@Inject
private AsyncJobDao _jobDao;
@Inject
private AsyncJobJournalDao _journalDao;
@Inject
private AsyncJobJoinMapDao _joinMapDao;
@Inject
private List<AsyncJobDispatcher> _jobDispatchers;
@Inject
private MessageBus _messageBus;
@Inject
private AsyncJobMonitor _jobMonitor;
@Inject
private VMInstanceDao _vmInstanceDao;
@Inject
private VmWorkJobDao _vmWorkJobDao;
@Inject
private VolumeDetailsDao _volumeDetailsDao;
@Inject
private VolumeDao _volsDao;
@Inject
private SnapshotDao _snapshotDao;
@Inject
private SnapshotService snapshotSrv;
@Inject
private SnapshotDataFactory snapshotFactory;
@Inject
private SnapshotDetailsDao _snapshotDetailsDao;
@Inject
private VolumeDataFactory volFactory;
@Inject
private VirtualMachineManager virtualMachineManager;
@Inject
private NetworkDao networkDao;
@Inject
private NetworkOrchestrationService networkOrchestrationService;
private volatile long _executionRunNumber = 1;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _apiJobExecutor;
private ExecutorService _workerJobExecutor;
private boolean asyncJobsEnabled = true;
@Override
public String getConfigComponentName() {
return AsyncJobManager.class.getSimpleName();
}
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes, VmJobLockTimeout, HidePassword};
}
@Override
public AsyncJobVO getAsyncJob(long jobId) {
return _jobDao.findByIdIncludingRemoved(jobId);
}
@Override
public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
}
@Override
@DB
public AsyncJob getPseudoJob(long accountId, long userId) {
AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
if (job == null) {
job = new AsyncJobVO();
job.setAccountId(accountId);
job.setUserId(userId);
job.setInitMsid(getMsid());
job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
job.setInstanceId(Thread.currentThread().getId());
_jobDao.persist(job);
}
return job;
}
@Override
public long submitAsyncJob(AsyncJob job) {
return submitAsyncJob(job, false);
}
private void checkShutdown() {
if (!isAsyncJobsEnabled()) {
throw new CloudRuntimeException("A shutdown has been triggered. Can not accept new jobs");
}
}
@SuppressWarnings("unchecked")
@DB
public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
checkShutdown();
@SuppressWarnings("rawtypes")
GenericDao dao = GenericDaoBase.getDao(job.getClass());
job.setInitMsid(getMsid());
job.setExecutingMsid(getMsid());
job.setSyncSource(null); // no sync source originally
dao.persist(job);
publishOnEventBus(job, "submit");
scheduleExecution(job, scheduleJobExecutionInContext);
if (logger.isDebugEnabled()) {
logger.debug("submit async job-" + job.getId() + ", details: " + StringUtils.cleanString(job.toString()));
}
return job.getId();
}
@SuppressWarnings("unchecked")
@Override
@DB
public long submitAsyncJob(final AsyncJob job, final String syncObjType, final long syncObjId) {
checkShutdown();
try {
@SuppressWarnings("rawtypes")
final GenericDao dao = GenericDaoBase.getDao(job.getClass());
if (dao == null) {
throw new CloudRuntimeException(String.format("Failed to get dao from job's class=%s, for job id=%d, cmd=%s", job.getClass(), job.getId(), job.getCmd()));
}
publishOnEventBus(job, "submit");
if (!_vmInstanceDao.lockInLockTable(String.valueOf(syncObjId), VmJobLockTimeout.value())){
throw new CloudRuntimeException("Failed to acquire lock in submitting async job: " + job.getCmd() + " with timeout value = " + VmJobLockTimeout.value());
}
try {
// lock is acquired
return Transaction.execute(new TransactionCallback<Long>() {
@Override
public Long doInTransaction(TransactionStatus status) {
job.setInitMsid(getMsid());
dao.persist(job);
syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
return job.getId();
}
});
} finally {
_vmInstanceDao.unlockFromLockTable(String.valueOf(syncObjId));
}
} catch (Exception e) {
String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
logger.warn(errMsg, e);
throw new CloudRuntimeException(errMsg);
}
}
@Override
@DB
public void completeAsyncJob(final long jobId, final Status jobStatus, final int resultCode, final String resultObject) {
String resultObj = null;
if (logger.isDebugEnabled()) {
resultObj = convertHumanReadableJson(obfuscatePassword(resultObject, HidePassword.value()));
logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + ", resultCode: " + resultCode + ", result: " + resultObj);
}
final AsyncJobVO job = _jobDao.findById(jobId);
if (job == null) {
if (logger.isDebugEnabled()) {
logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
resultObj);
}
// still purge item from queue to avoid any blocking
_queueMgr.purgeAsyncJobQueueItemId(jobId);
return;
}
if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
if (logger.isDebugEnabled()) {
logger.debug("job-" + jobId + " is already completed.");
}
// still purge item from queue to avoid any blocking
_queueMgr.purgeAsyncJobQueueItemId(jobId);
return;
}
if (resultObject != null) {
job.setResult(resultObject);
}
if (logger.isDebugEnabled()) {
logger.debug("Publish async job-" + jobId + " complete on message bus");
}
if (logger.isDebugEnabled()) {
logger.debug("Wake up jobs related to job-" + jobId);
}
final List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
@Override
public List<Long> doInTransaction(final TransactionStatus status) {
if (logger.isDebugEnabled()) {
logger.debug("Update db status for job-" + jobId);
}
job.setCompleteMsid(getMsid());
job.setStatus(jobStatus);
job.setResultCode(resultCode);
if (resultObject != null) {
job.setResult(resultObject);
} else {
job.setResult(null);
}
final Date currentGMTTime = DateUtil.currentGMTTime();
job.setLastUpdated(currentGMTTime);
job.setRemoved(currentGMTTime);
job.setExecutingMsid(null);
_jobDao.update(jobId, job);
if (logger.isDebugEnabled()) {
logger.debug("Wake up jobs joined with job-" + jobId + " and disjoin all subjobs created from job- " + jobId);
}
final List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
_joinMapDao.disjoinAllJobs(jobId);
// purge the job sync item from queue
_queueMgr.purgeAsyncJobQueueItemId(jobId);
return wakeupList;
}
});
publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out
//
// disable wakeup scheduling now, since all API jobs are currently using block-waiting for sub-jobs
//
/*
for (Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
*/
_messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
}
private String convertHumanReadableJson(String resultObj) {
if (resultObj != null && resultObj.contains("/") && resultObj.contains("{")){
resultObj = resultObj.substring(0, resultObj.indexOf("{")) + getHumanReadableBytesJson(resultObj.substring(resultObj.indexOf("{")));
}
return resultObj;
}
@Override
@DB
public void updateAsyncJobStatus(final long jobId, final int processStatus, final String resultObject) {
if (logger.isDebugEnabled()) {
logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus + ", result: " + resultObject);
}
final AsyncJobVO job = _jobDao.findById(jobId);
if (job == null) {
if (logger.isDebugEnabled()) {
logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
}
return;
}
publishOnEventBus(job, "update");
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
job.setProcessStatus(processStatus);
if (resultObject != null) {
job.setResult(resultObject);
}
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
});
}
@Override
@DB
public void updateAsyncJobAttachment(final long jobId, final String instanceType, final Long instanceId) {
if (logger.isDebugEnabled()) {
logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + ", instanceId: " + instanceId);
}
final AsyncJobVO job = _jobDao.findById(jobId);
publishOnEventBus(job, "update");
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
AsyncJobVO job = _jobDao.createForUpdate();
job.setInstanceType(instanceType);
job.setInstanceId(instanceId);
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
});
}
@Override
@DB
public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String journalText, String journalObjJson) {
AsyncJobJournalVO journal = new AsyncJobJournalVO();
journal.setJobId(jobId);
journal.setJournalType(journalType);
journal.setJournalText(journalText);
journal.setJournalObjJsonString(journalObjJson);
_journalDao.persist(journal);
}
@Override
@DB
public void joinJob(long jobId, long joinJobId) {
_joinMapDao.joinJob(jobId, joinJobId, getMsid(), 0, 0, null, null, null);
}
@Override
@DB
public void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakeupDispatcher, String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds,
long timeoutInMilliSeconds) {
Long syncSourceId = null;
AsyncJobExecutionContext context = AsyncJobExecutionContext.getCurrentExecutionContext();
assert (context.getJob() != null);
if (context.getJob().getSyncSource() != null) {
syncSourceId = context.getJob().getSyncSource().getQueueId();
}
_joinMapDao.joinJob(jobId, joinJobId, getMsid(), wakeupIntervalInMilliSeconds, timeoutInMilliSeconds, syncSourceId, wakeupHandler, wakeupDispatcher);
}
@Override
@DB
public void disjoinJob(long jobId, long joinedJobId) {
_joinMapDao.disjoinJob(jobId, joinedJobId);
}
@Override
@DB
public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
}
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
if (logger.isDebugEnabled()) {
logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
}
SyncQueueVO queue = null;
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
if (queue == null)
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
}
@Override
public AsyncJob queryJob(final long jobId, final boolean updatePollTime) {
final AsyncJobVO job = _jobDao.findByIdIncludingRemoved(jobId);
if (updatePollTime) {
job.setLastPolled(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
}
return job;
}
public String obfuscatePassword(String result, boolean hidePassword) {
if (hidePassword) {
String pattern = "\"password\":";
if (result != null) {
if (result.contains(pattern)) {
String[] resp = result.split(pattern);
String psswd = resp[1].toString().split(",")[0];
if (psswd.endsWith("}")) {
psswd = psswd.substring(0, psswd.length() - 1);
result = resp[0] + pattern + psswd.replace(psswd.substring(2, psswd.length() - 1), "*****") + "}," + resp[1].split(",", 2)[1];
} else {
result = resp[0] + pattern + psswd.replace(psswd.substring(2, psswd.length() - 1), "*****") + "," + resp[1].split(",", 2)[1];
}
}
}
}
return result;
}
private void scheduleExecution(final AsyncJobVO job) {
scheduleExecution(job, false);
}
private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
Runnable runnable = getExecutorRunnable(job);
if (executeInContext) {
runnable.run();
} else {
if (job.getDispatcher() == null || job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher"))
_apiJobExecutor.submit(runnable);
else
_workerJobExecutor.submit(runnable);
}
}
private AsyncJobDispatcher getDispatcher(String dispatcherName) {
assert (dispatcherName != null && !dispatcherName.isEmpty()) : "Who's not setting the dispatcher when submitting a job? Who am I suppose to call if you do that!";
for (AsyncJobDispatcher dispatcher : _jobDispatchers) {
if (dispatcherName.equals(dispatcher.getName()))
return dispatcher;
}
throw new CloudRuntimeException("Unable to find dispatcher name: " + dispatcherName);
}
private AsyncJobDispatcher findWakeupDispatcher(AsyncJob job) {
if (_jobDispatchers != null) {
List<AsyncJobJoinMapVO> joinRecords = _joinMapDao.listJoinRecords(job.getId());
if (joinRecords.size() > 0) {
AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
for (AsyncJobDispatcher dispatcher : _jobDispatchers) {
if (dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
return dispatcher;
}
} else {
logger.warn("job-" + job.getId() + " is scheduled for wakeup run, but there is no joining info anymore");
}
}
return null;
}
private long getJobRunNumber() {
synchronized (this) {
return _executionRunNumber++;
}
}
private Runnable getExecutorRunnable(final AsyncJob job) {
return new ManagedContextRunnable() {
@Override
public void run() {
// register place-holder context to avoid installing system account call context
if (CallContext.current() == null)
CallContext.registerPlaceHolderContext();
String related = job.getRelated();
String logContext = job.getShortUuid();
if (related != null && !related.isEmpty()) {
ThreadContext.push("job-" + related + "/" + "job-" + job.getId());
AsyncJob relatedJob = _jobDao.findByIdIncludingRemoved(Long.parseLong(related));
if (relatedJob != null) {
logContext = relatedJob.getShortUuid();
}
} else {
ThreadContext.push("job-" + job.getId());
}
ThreadContext.put("logcontextid", logContext);
try {
super.run();
} finally {
ThreadContext.pop();
}
}
@Override
protected void runInContext() {
long runNumber = getJobRunNumber();
try {
//
// setup execution environment
//
try {
JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
} catch (Exception e) {
// Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean register() call
// is expected to fail under situations
if (logger.isTraceEnabled())
logger.trace("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
_jobMonitor.registerActiveTask(runNumber, job.getId());
AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
String related = job.getRelated();
String logContext = job.getShortUuid();
if (related != null && !related.isEmpty()) {
AsyncJob relatedJob = _jobDao.findByIdIncludingRemoved(Long.parseLong(related));
if (relatedJob != null) {
logContext = relatedJob.getShortUuid();
}
}
ThreadContext.put("logcontextid", logContext);
// execute the job
if (logger.isDebugEnabled()) {
logger.debug("Executing " + StringUtils.cleanString(job.toString()));
}
if ((getAndResetPendingSignals(job) & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = findWakeupDispatcher(job);
if (jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
// TODO, job wakeup is not in use yet
if (logger.isTraceEnabled())
logger.trace("Unable to find a wakeup dispatcher from the joined job: " + job);
}
} else {
AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
if (jobDispatcher != null) {
jobDispatcher.runJob(job);
} else {
logger.error("Unable to find job dispatcher, job will be cancelled");
completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Done executing " + job.getCmd() + " for job-" + job.getId());
}
} catch (Throwable e) {
logger.error("Unexpected exception", e);
completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
} finally {
// guard final clause as well
try {
if (job.getSyncSource() != null) {
// here check queue item one more time to double make sure that queue item is removed in case of any uncaught exception
_queueMgr.purgeItem(job.getSyncSource().getId());
}
try {
JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
} catch (Exception e) {
// Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean unregister() call
// is expected to fail under situations
if (logger.isTraceEnabled())
logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
//
// clean execution environment
//
AsyncJobExecutionContext.unregister();
_jobMonitor.unregisterActiveTask(runNumber);
} catch (Throwable e) {
logger.error("Double exception", e);
}
}
}
};
}
private int getAndResetPendingSignals(AsyncJob job) {
int signals = job.getPendingSignals();
if (signals != 0) {
AsyncJobVO jobRecord = _jobDao.findById(job.getId());
jobRecord.setPendingSignals(0);
_jobDao.update(job.getId(), jobRecord);
}
return signals;
}
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
AsyncJobVO job = _jobDao.findById(item.getContentId());
if (job != null) {
if (logger.isDebugEnabled()) {
logger.debug("Schedule queued job-" + job.getId());
}
job.setSyncSource(item);
//
// TODO: a temporary solution to work-around DB deadlock situation
//
// to live with DB deadlocks, we will give a chance for job to be rescheduled
// in case of exceptions (most-likely DB deadlock exceptions)
try {
job.setExecutingMsid(getMsid());
_jobDao.update(job.getId(), job);
} catch (Exception e) {
logger.warn("Unexpected exception while dispatching job-" + item.getContentId(), e);
try {
_queueMgr.returnItem(item.getId());
} catch (Throwable thr) {
logger.error("Unexpected exception while returning job-" + item.getContentId() + " to queue", thr);
}
}
try {
scheduleExecution(job);
} catch (RejectedExecutionException e) {
logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn");
try {
_queueMgr.returnItem(item.getId());
} catch (Exception e2) {
logger.error("Unexpected exception while returning job-" + item.getContentId() + " to queue", e2);
}
try {
job.setExecutingMsid(null);
_jobDao.update(job.getId(), job);
} catch (Exception e3) {
logger.warn("Unexpected exception while update job-" + item.getContentId() + " msid for bookkeeping");
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Unable to find related job for queue item: " + item.toString());
}
_queueMgr.purgeItem(item.getId());
}
}
@Override
public void releaseSyncSource() {
AsyncJobExecutionContext executionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
assert (executionContext != null);
if (executionContext.getSyncSource() != null) {
if (logger.isDebugEnabled()) {
logger.debug("Release sync source for job-" + executionContext.getJob().getId() + " sync source: " + executionContext.getSyncSource().getContentType() +
"-" + executionContext.getSyncSource().getContentId());
}
_queueMgr.purgeItem(executionContext.getSyncSource().getId());
checkQueue(executionContext.getSyncSource().getQueueId());
}
}
@Override
public boolean waitAndCheck(AsyncJob job, String[] wakeupTopicsOnMessageBus, long checkIntervalInMilliSeconds, long timeoutInMilliseconds, Predicate predicate) {
MessageDetector msgDetector = new MessageDetector();
String[] topics = Arrays.copyOf(wakeupTopicsOnMessageBus, wakeupTopicsOnMessageBus.length + 1);
topics[topics.length - 1] = AsyncJob.Topics.JOB_STATE;
msgDetector.open(_messageBus, topics);
try {
long startTick = System.currentTimeMillis();
while (timeoutInMilliseconds < 0 || System.currentTimeMillis() - startTick < timeoutInMilliseconds) {
msgDetector.waitAny(checkIntervalInMilliSeconds);
job = _jobDao.findById(job.getId());
if (job != null && job.getStatus().done()) {
return true;
}
if (predicate.checkCondition()) {
return true;
}
}
} finally {
msgDetector.close();
}
return false;
}
@Override
public String marshallResultObject(Serializable obj) {
if (obj != null)
return JobSerializerHelper.toObjectSerializedString(obj);
return null;
}
@Override
public Object unmarshallResultObject(AsyncJob job) {
if(job != null && job.getResult() != null)
return JobSerializerHelper.fromObjectSerializedString(job.getResult());
return null;
}
private void checkQueue(long queueId) {
while (true) {
try {
SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
if (item != null) {
if (logger.isDebugEnabled()) {
logger.debug("Executing sync queue item: " + item.toString());
}
executeQueueItem(item, false);
} else {
break;
}
} catch (Throwable e) {
logger.error("Unexpected exception when kicking sync queue-" + queueId, e);
break;
}
}
}
private Runnable getHeartbeatTask() {
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerHeartbeat");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
protected void reallyRun() {
try {
if (!isAsyncJobsEnabled()) {
logger.info("A shutdown has been triggered. Not executing any async job");
return;
}
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
if (l != null && l.size() > 0) {
for (SyncQueueItemVO item : l) {
if (logger.isDebugEnabled()) {
logger.debug("Execute sync-queue item: " + item.toString());
}
executeQueueItem(item, false);
}
}
List<Long> standaloneWakeupJobs = wakeupScan();
for (Long jobId : standaloneWakeupJobs) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO job = _jobDao.findById(jobId);
if (job != null && (job.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(job, false);
}
} catch (Throwable e) {
logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
@DB
private Runnable getGCTask() {
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
public void reallyRun() {
try {
logger.trace("Begin cleanup expired async-jobs");
// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
if (blockItems != null && blockItems.size() > 0) {
for (SyncQueueItemVO item : blockItems) {
try {
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
_jobMonitor.unregisterByJobId(item.getContentId());
}
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
} catch (Throwable e) {
logger.error("Unexpected exception when trying to remove job from sync queue, ", e);
}
}
}
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
// 1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for (AsyncJobVO job : unfinishedJobs) {
try {
logger.info("Expunging unfinished job-" + job.getId());
_jobMonitor.unregisterByJobId(job.getId());
expungeAsyncJob(job);
} catch (Throwable e) {
logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e);
}
}
// 2) Expunge finished jobs
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for (AsyncJobVO job : completedJobs) {
try {
logger.info("Expunging completed job-" + job.getId());
expungeAsyncJob(job);
} catch (Throwable e) {
logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e);
}
}
logger.trace("End cleanup expired async-jobs");
} catch (Throwable e) {
logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
@DB
protected void expungeAsyncJob(final AsyncJobVO job) {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
if ("VmWork".equals(job.getType())) {
_vmWorkJobDao.expunge(job.getId());
}
_jobDao.expunge(job.getId());
// purge corresponding sync queue item
_queueMgr.purgeAsyncJobQueueItemId(job.getId());
}
});
}
private long getMsid() {
return ManagementServerNode.getManagementServerId();
}
@DB
protected List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId);
List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
if (result.size() > 0) {
Collections.sort(result);
Long[] ids = result.toArray(new Long[result.size()]);
final SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
final SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
AsyncJobVO job = _jobDao.createForUpdate();
job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
_jobDao.update(job, jobsSC);
SyncQueueItemVO item = _queueItemDao.createForUpdate();
item.setLastProcessNumber(null);
item.setLastProcessMsid(null);
_queueItemDao.update(item, queueItemsSC);
}
return _joinMapDao.findJobsToWake(joinedJobId);
}
@DB
protected List<Long> wakeupScan() {
final Date cutDate = DateUtil.currentGMTTime();
SearchCriteria<Long> sc = JoinJobTimeSearch.create();
sc.setParameters("beginTime", cutDate);
sc.setParameters("endTime", cutDate);
final List<Long> result = _joinMapDao.customSearch(sc, null);
return Transaction.execute(new TransactionCallback<List<Long>>() {
@Override
public List<Long> doInTransaction(TransactionStatus status) {
if (result.size() > 0) {
Collections.sort(result);
Long[] ids = result.toArray(new Long[result.size()]);
AsyncJobVO job = _jobDao.createForUpdate();
job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
_jobDao.update(job, sc2);
SyncQueueItemVO item = _queueItemDao.createForUpdate();
item.setLastProcessNumber(null);
item.setLastProcessMsid(null);
_queueItemDao.update(item, queueItemsSC);
}
return _joinMapDao.findJobsToWakeBetween(cutDate);
}
});
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
try {
final Properties dbProps = DbProperties.getDbProperties();
final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
int apiPoolSize = cloudMaxActive / 2;
int workPoolSize = (cloudMaxActive * 2) / 3;
logger.info("Start AsyncJobManager API executor thread pool in size " + apiPoolSize);
_apiJobExecutor = Executors.newFixedThreadPool(apiPoolSize, new NamedThreadFactory(AsyncJobManager.API_JOB_POOL_THREAD_PREFIX));
logger.info("Start AsyncJobManager Work executor thread pool in size " + workPoolSize);
_workerJobExecutor = Executors.newFixedThreadPool(workPoolSize, new NamedThreadFactory(AsyncJobManager.WORK_JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}
JoinJobSearch = _joinMapDao.createSearchBuilder(Long.class);
JoinJobSearch.and(JoinJobSearch.entity().getJoinJobId(), Op.EQ, "joinJobId");
JoinJobSearch.selectFields(JoinJobSearch.entity().getJobId());
JoinJobSearch.done();
JoinJobTimeSearch = _joinMapDao.createSearchBuilder(Long.class);
JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getNextWakeupTime(), Op.LT, "beginTime");
JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getExpiration(), Op.GT, "endTime");
JoinJobTimeSearch.selectFields(JoinJobTimeSearch.entity().getJobId()).done();
JobIdsSearch = _jobDao.createSearchBuilder();
JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();
QueueJobIdsSearch = _queueItemDao.createSearchBuilder();
QueueJobIdsSearch.and(QueueJobIdsSearch.entity().getContentId(), Op.IN, "contentIds").done();
JoinJobIdsSearch = _joinMapDao.createSearchBuilder(Long.class);
JoinJobIdsSearch.selectFields(JoinJobIdsSearch.entity().getJobId());
JoinJobIdsSearch.and(JoinJobIdsSearch.entity().getJoinJobId(), Op.EQ, "joinJobId");
JoinJobIdsSearch.and(JoinJobIdsSearch.entity().getJobId(), Op.NIN, "jobIds");
JoinJobIdsSearch.done();
ContentIdsSearch = _queueItemDao.createSearchBuilder(Long.class);
ContentIdsSearch.selectFields(ContentIdsSearch.entity().getContentId()).done();
AsyncJobExecutionContext.init(this, _joinMapDao);
OutcomeImpl.init(this);
return true;
}
private void cleanupLeftOverJobs(final long msid) {
try {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
// purge sync queue item running on this ms node
_queueMgr.cleanupActiveQueueItems(msid, true);
// reset job status for all jobs running on this ms node
final List<AsyncJobVO> jobs = _jobDao.getResetJobs(msid);
for (final AsyncJobVO job : jobs) {
if (logger.isDebugEnabled()) {
logger.debug("Cancel left-over job-" + job.getId());
}
cleanupResources(job);
job.setStatus(JobInfo.Status.FAILED);
job.setResultCode(ApiErrorCode.INTERNAL_ERROR.getHttpCode());
job.setResult("job cancelled because of management server restart or shutdown");
job.setCompleteMsid(msid);
final Date currentGMTTime = DateUtil.currentGMTTime();
job.setLastUpdated(currentGMTTime);
job.setRemoved(currentGMTTime);
_jobDao.update(job.getId(), job);
if (logger.isDebugEnabled()) {
logger.debug("Purge queue item for cancelled job-" + job.getId());
}
_queueMgr.purgeAsyncJobQueueItemId(job.getId());
}
cleanupFailedSnapshotsCreatedWithDefaultStrategy(msid);
}
});
} catch (Throwable e) {
logger.warn("Unexpected exception in cleaning up left over jobs for mamagement server node " + msid, e);
}
}
/*
Cleanup Resources in transition state and move them to appropriate state
This will allow other operation on the resource, instead of being stuck in transition state
*/
protected boolean cleanupResources(AsyncJobVO job) {
try {
ApiCommandResourceType resourceType = ApiCommandResourceType.fromString(job.getInstanceType());
if (resourceType == null) {
logger.warn("Unknown ResourceType. Skip Cleanup: " + job.getInstanceType());
return true;
}
switch (resourceType) {
case Volume:
return cleanupVolume(job.getInstanceId());
case VirtualMachine:
return cleanupVirtualMachine(job.getInstanceId());
case Network:
return cleanupNetwork(job.getInstanceId());
}
} catch (Exception e) {
logger.warn("Error while cleaning up resource: [" + job.getInstanceType().toString() + "] with Id: " + job.getInstanceId(), e);
return false;
}
return true;
}
private boolean cleanupVolume(final long volumeId) {
VolumeInfo vol = volFactory.getVolume(volumeId);
if (vol == null) {
logger.warn("Volume not found. Skip Cleanup. VolumeId: " + volumeId);
return true;
}
if (vol.getState().isTransitional()) {
logger.debug("Cleaning up volume with Id: " + volumeId);
boolean status = vol.stateTransit(Volume.Event.OperationFailed);
cleanupFailedVolumesCreatedFromSnapshots(volumeId);
return status;
}
logger.debug("Volume not in transition state. Skip cleanup. VolumeId: " + volumeId);
return true;
}
private boolean cleanupVirtualMachine(final long vmId) throws Exception {
VMInstanceVO vmInstanceVO = _vmInstanceDao.findById(vmId);
if (vmInstanceVO == null) {
logger.warn("Instance not found. Skip Cleanup. InstanceId: " + vmId);
return true;
}
if (vmInstanceVO.getState().isTransitional()) {
logger.debug("Cleaning up Instance with Id: " + vmId);
return virtualMachineManager.stateTransitTo(vmInstanceVO, VirtualMachine.Event.OperationFailed, vmInstanceVO.getHostId());
}
logger.debug("Instance not in transition state. Skip cleanup. InstanceId: " + vmId);
return true;
}
private boolean cleanupNetwork(final long networkId) throws Exception {
NetworkVO networkVO = networkDao.findById(networkId);
if (networkVO == null) {
logger.warn("Network not found. Skip Cleanup. NetworkId: " + networkId);
return true;
}
if (Network.State.Implementing.equals(networkVO.getState())) {
try {
logger.debug("Cleaning up Network with Id: " + networkId);
return networkOrchestrationService.stateTransitTo(networkVO, Network.Event.OperationFailed);
} catch (final NoTransitionException e) {
networkVO.setState(Network.State.Shutdown);
networkDao.update(networkVO.getId(), networkVO);
}
}
logger.debug("Network not in transition state. Skip cleanup. NetworkId: " + networkId);
return true;
}
private void cleanupFailedVolumesCreatedFromSnapshots(final long volumeId) {
try {
VolumeDetailVO volumeDetail = _volumeDetailsDao.findDetail(volumeId, VolumeService.SNAPSHOT_ID);
if (volumeDetail != null) {
_volumeDetailsDao.removeDetail(volumeId, VolumeService.SNAPSHOT_ID);
_volsDao.remove(volumeId);
}
} catch (Exception e) {
logger.error("Unexpected exception while removing concurrent request meta data :" + e.getLocalizedMessage());
}
}
private void cleanupFailedSnapshotsCreatedWithDefaultStrategy(final long msid) {
final List<SnapshotDetailsVO> snapshotList = _snapshotDetailsDao.findDetails(AsyncJob.Constants.MS_ID, Long.toString(msid), false);
for (final SnapshotDetailsVO snapshotDetailsVO : snapshotList) {
SnapshotInfo snapshot = snapshotFactory.getSnapshotOnPrimaryStore(snapshotDetailsVO.getResourceId());
if (snapshot == null) {
_snapshotDetailsDao.remove(snapshotDetailsVO.getId());
continue;
}
snapshotSrv.processEventOnSnapshotObject(snapshot, Snapshot.Event.OperationFailed);
_snapshotDetailsDao.removeDetail(snapshotDetailsVO.getResourceId(), AsyncJob.Constants.MS_ID);
}
}
@Override
public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
}
@Override
public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
for (final ManagementServerHost msHost : nodeList) {
cleanupLeftOverJobs(msHost.getId());
}
}
@Override
public void onManagementNodeIsolated() {
}
@Override
public boolean start() {
cleanupLeftOverJobs(getMsid());
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
return true;
}
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();
_apiJobExecutor.shutdown();
_workerJobExecutor.shutdown();
return true;
}
private GenericSearchBuilder<SyncQueueItemVO, Long> ContentIdsSearch;
private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobSearch;
private SearchBuilder<AsyncJobVO> JobIdsSearch;
private SearchBuilder<SyncQueueItemVO> QueueJobIdsSearch;
private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobIdsSearch;
private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobTimeSearch;
protected AsyncJobManagerImpl() {
// override default run level for manager components to start this early, otherwise, VirtualMachineManagerImpl will
// get stuck in non-initializing job queue
setRunLevel(ComponentLifecycle.RUN_LEVEL_FRAMEWORK);
}
private void publishOnEventBus(AsyncJob job, String jobEvent) {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
}
@Override
public List<AsyncJobVO> findFailureAsyncJobs(String... cmds) {
return _jobDao.getFailureJobsSinceLastMsStart(getMsid(), cmds);
}
@Override
public long countPendingJobs(String havingInfo, String... cmds) {
return _jobDao.countPendingJobs(havingInfo, cmds);
}
// Returns the number of pending jobs for the given Management server msids.
// NOTE: This is the msid and NOT the id
@Override
public long countPendingNonPseudoJobs(Long... msIds) {
return _jobDao.countPendingNonPseudoJobs(msIds);
}
@Override
public void enableAsyncJobs() {
this.asyncJobsEnabled = true;
}
@Override
public void disableAsyncJobs() {
this.asyncJobsEnabled = false;
}
@Override
public boolean isAsyncJobsEnabled() {
return asyncJobsEnabled;
}
}