blob: a2f1f36b8637c3d71b6622a45063281935768757 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.jobs.JobInfo;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> pseudoJobSearch;
private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch;
private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> failureMsidAsyncJobSearch;
private final GenericSearchBuilder<AsyncJobVO, Long> asyncJobTypeSearch;
private final GenericSearchBuilder<AsyncJobVO, Long> pendingNonPseudoAsyncJobsSearch;
public AsyncJobDaoImpl() {
pendingAsyncJobSearch = createSearchBuilder();
pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
pendingAsyncJobSearch.done();
expiringAsyncJobSearch = createSearchBuilder();
expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
expiringAsyncJobSearch.done();
pendingAsyncJobsSearch = createSearchBuilder();
pendingAsyncJobsSearch.and("instanceType", pendingAsyncJobsSearch.entity().getInstanceType(), SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.and("accountId", pendingAsyncJobsSearch.entity().getAccountId(), SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.and("status", pendingAsyncJobsSearch.entity().getStatus(), SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.done();
expiringUnfinishedAsyncJobSearch = createSearchBuilder();
expiringUnfinishedAsyncJobSearch.and("jobDispatcher", expiringUnfinishedAsyncJobSearch.entity().getDispatcher(), SearchCriteria.Op.NEQ);
expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL);
expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
expiringUnfinishedAsyncJobSearch.done();
expiringCompletedAsyncJobSearch = createSearchBuilder();
expiringCompletedAsyncJobSearch.and(ApiConstants.REMOVED, expiringCompletedAsyncJobSearch.entity().getRemoved(), SearchCriteria.Op.LTEQ);
expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL);
expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
expiringCompletedAsyncJobSearch.done();
pseudoJobSearch = createSearchBuilder();
pseudoJobSearch.and("jobDispatcher", pseudoJobSearch.entity().getDispatcher(), Op.EQ);
pseudoJobSearch.and("instanceType", pseudoJobSearch.entity().getInstanceType(), Op.EQ);
pseudoJobSearch.and("instanceId", pseudoJobSearch.entity().getInstanceId(), Op.EQ);
pseudoJobSearch.done();
pseudoJobCleanupSearch = createSearchBuilder();
pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ);
pseudoJobCleanupSearch.done();
failureMsidAsyncJobSearch = createSearchBuilder();
failureMsidAsyncJobSearch.and("initMsid", failureMsidAsyncJobSearch.entity().getInitMsid(), Op.EQ);
failureMsidAsyncJobSearch.and("instanceType", failureMsidAsyncJobSearch.entity().getInstanceType(), SearchCriteria.Op.EQ);
failureMsidAsyncJobSearch.and("status", failureMsidAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
failureMsidAsyncJobSearch.and("job_cmd", failureMsidAsyncJobSearch.entity().getCmd(), Op.IN);
failureMsidAsyncJobSearch.done();
asyncJobTypeSearch = createSearchBuilder(Long.class);
asyncJobTypeSearch.select(null, SearchCriteria.Func.COUNT, asyncJobTypeSearch.entity().getId());
asyncJobTypeSearch.and("job_info", asyncJobTypeSearch.entity().getCmdInfo(),Op.LIKE);
asyncJobTypeSearch.and("job_cmd", asyncJobTypeSearch.entity().getCmd(), Op.IN);
asyncJobTypeSearch.and("status", asyncJobTypeSearch.entity().getStatus(), SearchCriteria.Op.EQ);
asyncJobTypeSearch.done();
pendingNonPseudoAsyncJobsSearch = createSearchBuilder(Long.class);
pendingNonPseudoAsyncJobsSearch.select(null, SearchCriteria.Func.COUNT, pendingNonPseudoAsyncJobsSearch.entity().getId());
pendingNonPseudoAsyncJobsSearch.and("instanceTypeNEQ", pendingNonPseudoAsyncJobsSearch.entity().getInstanceType(), SearchCriteria.Op.NEQ);
pendingNonPseudoAsyncJobsSearch.and("jobStatusEQ", pendingNonPseudoAsyncJobsSearch.entity().getStatus(), SearchCriteria.Op.EQ);
pendingNonPseudoAsyncJobsSearch.and("executingMsidIN", pendingNonPseudoAsyncJobsSearch.entity().getExecutingMsid(), SearchCriteria.Op.IN);
}
@Override
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("instanceId", instanceId);
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
List<AsyncJobVO> l = listIncludingRemovedBy(sc);
if (l != null && l.size() > 0) {
if (l.size() > 1) {
logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job");
}
return l.get(0);
}
return null;
}
@Override
public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobsSearch.create();
sc.setParameters("instanceType", instanceType);
if (accountId != null) {
sc.setParameters("accountId", accountId);
}
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
return listBy(sc);
}
@Override
public AsyncJobVO findPseudoJob(long threadId, long msid) {
SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
sc.setParameters("instanceId", threadId);
List<AsyncJobVO> result = listBy(sc);
if (result != null && result.size() > 0) {
assert (result.size() == 1);
return result.get(0);
}
return null;
}
@Override
public void cleanupPseduoJobs(long msid) {
SearchCriteria<AsyncJobVO> sc = pseudoJobCleanupSearch.create();
sc.setParameters("initMsid", msid);
this.expunge(sc);
}
@Override
public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
sc.setParameters("created", cutTime);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}
@Override
public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
sc.setParameters("created", cutTime);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}
@Override
public List<AsyncJobVO> getExpiredCompletedJobs(final Date cutTime, final int limit) {
final SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
sc.setParameters(ApiConstants.REMOVED, cutTime);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
final Filter filter = new Filter(AsyncJobVO.class, ApiConstants.REMOVED, true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}
@Override
@DB
public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
String sql = "UPDATE async_job SET job_status=?, job_result_code=?, job_result=? where job_status=? AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setInt(1, JobInfo.Status.FAILED.ordinal());
pstmt.setInt(2, jobResultCode);
pstmt.setString(3, jobResultMessage);
pstmt.setInt(4, JobInfo.Status.IN_PROGRESS.ordinal());
pstmt.setLong(5, msid);
pstmt.setLong(6, msid);
pstmt.execute();
} catch (SQLException e) {
logger.warn("Unable to reset job status for management server " + msid, e);
} catch (Throwable e) {
logger.warn("Unable to reset job status for management server " + msid, e);
}
}
@Override
public List<AsyncJobVO> getResetJobs(long msid) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
// construct query: (job_executing_msid=msid OR (job_executing_msid IS NULL AND job_init_msid=msid))
SearchCriteria<AsyncJobVO> msQuery = createSearchCriteria();
msQuery.addOr("executingMsid", SearchCriteria.Op.EQ, msid);
SearchCriteria<AsyncJobVO> initMsQuery = createSearchCriteria();
initMsQuery.addAnd("executingMsid", SearchCriteria.Op.NULL);
initMsQuery.addAnd("initMsid", SearchCriteria.Op.EQ, msid);
msQuery.addOr("initMsid", SearchCriteria.Op.SC, initMsQuery);
sc.addAnd("executingMsid", SearchCriteria.Op.SC, msQuery);
Filter filter = new Filter(AsyncJobVO.class, "created", true, null, null);
return listIncludingRemovedBy(sc, filter);
}
@Override
public List<AsyncJobVO> getFailureJobsSinceLastMsStart(long msId, String... cmds) {
SearchCriteria<AsyncJobVO> sc = failureMsidAsyncJobSearch.create();
sc.setParameters("initMsid", msId);
sc.setParameters("status", AsyncJobVO.Status.FAILED);
sc.setParameters("job_cmd", (Object[])cmds);
return listBy(sc);
}
// Returns the number of pending jobs for the given Management server msids.
// NOTE: This is the msid and NOT the id
@Override
public long countPendingNonPseudoJobs(Long... msIds) {
SearchCriteria<Long> sc = pendingNonPseudoAsyncJobsSearch.create();
sc.setParameters("instanceTypeNEQ", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
sc.setParameters("jobStatusEQ", JobInfo.Status.IN_PROGRESS);
if (msIds != null) {
sc.setParameters("executingMsidIN", (Object[])msIds);
}
List<Long> results = customSearch(sc, null);
return results.get(0);
}
@Override
public long countPendingJobs(String havingInfo, String... cmds) {
SearchCriteria<Long> sc = asyncJobTypeSearch.create();
sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
sc.setParameters("job_cmd", (Object[])cmds);
if (havingInfo != null) {
sc.setParameters("job_info", "%" + havingInfo + "%");
}
List<Long> results = customSearch(sc, null);
return results.get(0);
}
}