blob: 3397daa58191ff5c94ebca825d44c4550fa778ee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs.impl;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.inject.Inject;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
@Inject
private SyncQueueDao _syncQueueDao;
@Inject
private SyncQueueItemDao _syncQueueItemDao;
@Override
@DB
public SyncQueueVO queue(final String syncObjType, final long syncObjId, final String itemType, final long itemId, final long queueSizeLimit) {
try {
return Transaction.execute(new TransactionCallback<SyncQueueVO>() {
@Override
public SyncQueueVO doInTransaction(TransactionStatus status) {
_syncQueueDao.ensureQueue(syncObjType, syncObjId);
SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
if (queueVO == null)
throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
queueVO.setQueueSizeLimit(queueSizeLimit);
_syncQueueDao.update(queueVO.getId(), queueVO);
Date dt = DateUtil.currentGMTTime();
SyncQueueItemVO item = new SyncQueueItemVO();
item.setQueueId(queueVO.getId());
item.setContentType(itemType);
item.setContentId(itemId);
item.setCreated(dt);
_syncQueueItemDao.persist(item);
return queueVO;
}
});
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
return null;
}
@Override
@DB
public SyncQueueItemVO dequeueFromOne(final long queueId, final Long msid) {
try {
return Transaction.execute(new TransactionCallback<SyncQueueItemVO>() {
@Override
public SyncQueueItemVO doInTransaction(TransactionStatus status) {
SyncQueueVO queueVO = _syncQueueDao.findById(queueId);
if(queueVO == null) {
logger.error("Sync queue(id: " + queueId + ") does not exist");
return null;
}
if (queueReadyToProcess(queueVO)) {
SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
if (itemVO != null) {
Long processNumber = queueVO.getLastProcessNumber();
if (processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastUpdated(dt);
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
itemVO.setLastProcessTime(dt);
_syncQueueItemDao.update(itemVO.getId(), itemVO);
return itemVO;
} else {
if (logger.isDebugEnabled())
logger.debug("Sync queue (" + queueId + ") is currently empty");
}
} else {
if (logger.isDebugEnabled())
logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
}
return null;
}
});
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
return null;
}
@Override
@DB
public List<SyncQueueItemVO> dequeueFromAny(final Long msid, final int maxItems) {
final List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
try {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId());
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId());
if(queueReadyToProcess(queueVO) && itemVO != null && itemVO.getLastProcessNumber() == null) {
Long processNumber = queueVO.getLastProcessNumber();
if (processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastUpdated(dt);
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
itemVO.setLastProcessTime(dt);
_syncQueueItemDao.update(item.getId(), itemVO);
resultList.add(itemVO);
}
}
}
}
});
return resultList;
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
return null;
}
@Override
@DB
public void purgeItem(final long queueItemId) {
try {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId());
_syncQueueItemDao.expunge(itemVO.getId());
// if item is active, reset queue information
if (itemVO.getLastProcessMsid() != null) {
queueVO.setLastUpdated(DateUtil.currentGMTTime());
// decrement the count
assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
}
}
}
});
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
}
@Override
@DB
public void returnItem(final long queueItemId) {
logger.info("Returning queue item " + queueItemId + " back to queue for second try in case of DB deadlock");
try {
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId());
itemVO.setLastProcessMsid(null);
itemVO.setLastProcessNumber(null);
itemVO.setLastProcessTime(null);
_syncQueueItemDao.update(queueItemId, itemVO);
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
queueVO.setLastUpdated(DateUtil.currentGMTTime());
_syncQueueDao.update(queueVO.getId(), queueVO);
}
}
});
} catch (Exception e) {
logger.error("Unexpected exception: ", e);
}
}
@Override
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
}
@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
}
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
if (nActiveItems < queueVO.getQueueSizeLimit())
return true;
if (logger.isDebugEnabled())
logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId()
+ "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId()
+ ") is reaching concurrency limit " + queueVO.getQueueSizeLimit());
return false;
}
@Override
public void purgeAsyncJobQueueItemId(long asyncJobId) {
Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
if (itemId != null) {
purgeItem(itemId);
}
}
@Override
public void cleanupActiveQueueItems(Long msid, boolean exclusive) {
List<SyncQueueItemVO> l = getActiveQueueItems(msid, false);
for (SyncQueueItemVO item : l) {
if (logger.isInfoEnabled()) {
logger.info("Discard left-over queue item: " + item.toString());
}
purgeItem(item.getId());
}
}
}