blob: 889d0ce14ccc986367b0b09c9db66f008e1f9c49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.storage.cache.manager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionService;
import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.State;
import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
import org.apache.cloudstack.engine.subsystem.api.storage.StorageCacheManager;
import org.apache.cloudstack.framework.async.AsyncCallFuture;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.cache.allocator.StorageCacheAllocator;
import org.apache.cloudstack.storage.datastore.ObjectInDataStoreManager;
import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
import com.cloud.agent.api.to.DataObjectType;
import com.cloud.configuration.Config;
import com.cloud.storage.DataStoreRole;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.exception.CloudRuntimeException;
public class StorageCacheManagerImpl implements StorageCacheManager, Manager {
protected Logger logger = LogManager.getLogger(getClass());
@Inject
List<StorageCacheAllocator> storageCacheAllocator;
@Inject
DataMotionService dataMotionSvr;
@Inject
ObjectInDataStoreManager objectInStoreMgr;
@Inject
DataStoreManager dataStoreManager;
@Inject
StorageCacheReplacementAlgorithm cacheReplacementAlgorithm;
@Inject
ConfigurationDao configDao;
Boolean cacheReplacementEnabled = Boolean.TRUE;
int workers;
ScheduledExecutorService executors;
int cacheReplaceMentInterval;
private static final Object templateLock = new Object();
private static final Object volumeLock = new Object();
private static final Object snapshotLock = new Object();
@Override
public DataStore getCacheStorage(Scope scope) {
for (StorageCacheAllocator allocator : storageCacheAllocator) {
DataStore store = allocator.getCacheStore(scope);
if (store != null) {
return store;
}
}
return null;
}
@Override
public DataStore getCacheStorage(DataObject data, Scope scope) {
for (StorageCacheAllocator allocator : storageCacheAllocator) {
DataStore store = allocator.getCacheStore(data, scope);
if (store != null) {
return store;
}
}
return null;
}
protected List<DataStore> getCacheStores() {
QueryBuilder<ImageStoreVO> sc = QueryBuilder.create(ImageStoreVO.class);
sc.and(sc.entity().getRole(), SearchCriteria.Op.EQ, DataStoreRole.ImageCache);
List<ImageStoreVO> imageStoreVOs = sc.list();
List<DataStore> stores = new ArrayList<DataStore>();
for (ImageStoreVO vo : imageStoreVOs) {
stores.add(dataStoreManager.getDataStore(vo.getId(), vo.getRole()));
}
return stores;
}
@Override
public String getName() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setName(String name) {
// TODO Auto-generated method stub
}
@Override
public void setConfigParams(Map<String, Object> params) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getConfigParams() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getRunLevel() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setRunLevel(int level) {
// TODO Auto-generated method stub
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
cacheReplacementEnabled = Boolean.parseBoolean(configDao.getValue(Config.StorageCacheReplacementEnabled.key()));
cacheReplaceMentInterval = NumbersUtil.parseInt(configDao.getValue(Config.StorageCacheReplacementInterval.key()), 86400);
workers = NumbersUtil.parseInt(configDao.getValue(Config.ExpungeWorkers.key()), 10);
executors = Executors.newScheduledThreadPool(workers, new NamedThreadFactory("StorageCacheManager-cache-replacement"));
return true;
}
protected class CacheReplacementRunner extends ManagedContextRunnable {
@Override
protected void runInContext() {
GlobalLock replacementLock = null;
try {
replacementLock = GlobalLock.getInternLock("storageCacheMgr.replacement");
if (replacementLock.lock(3)) {
List<DataStore> stores = getCacheStores();
Collections.shuffle(stores);
DataObject object = null;
DataStore findAStore = null;
for (DataStore store : stores) {
object = cacheReplacementAlgorithm.chooseOneToBeReplaced(store);
findAStore = store;
if (object != null) {
break;
}
}
if (object == null) {
return;
}
while (object != null) {
object.delete();
object = cacheReplacementAlgorithm.chooseOneToBeReplaced(findAStore);
}
}
} catch (Exception e) {
logger.debug("Failed to execute CacheReplacementRunner: " + e.toString());
} finally {
if (replacementLock != null) {
replacementLock.unlock();
}
}
}
}
@Override
public boolean start() {
if (cacheReplacementEnabled) {
Random generator = new Random();
int initalDelay = generator.nextInt(cacheReplaceMentInterval);
executors.scheduleWithFixedDelay(new CacheReplacementRunner(), initalDelay, cacheReplaceMentInterval, TimeUnit.SECONDS);
}
return true;
}
@Override
public boolean stop() {
// TODO Auto-generated method stub
return true;
}
@Override
public DataObject createCacheObject(DataObject data, DataStore store) {
DataObject objOnCacheStore = null;
final Object lock;
final DataObjectType type = data.getType();
final String typeName;
final long storeId = store.getId();
final long dataId = data.getId();
/*
* Make sure any thread knows own lock type.
*/
if (type == DataObjectType.TEMPLATE) {
lock = templateLock;
typeName = "template";
} else if (type == DataObjectType.VOLUME) {
lock = volumeLock;
typeName = "volume";
} else if (type == DataObjectType.SNAPSHOT) {
lock = snapshotLock;
typeName = "snapshot";
} else {
String msg = "unsupported DataObject comes, then can't acquire correct lock object";
throw new CloudRuntimeException(msg);
}
logger.debug("check " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")");
DataObject existingDataObj = null;
synchronized (lock) {
DataObjectInStore obj = objectInStoreMgr.findObject(data, store);
if (obj != null) {
State st = obj.getState();
long milliSeconds = 10000;
long timeoutSeconds = 3600;
long timeoutMilliSeconds = timeoutSeconds * 1000;
Date now = new Date();
long expiredEpoch = now.getTime() + timeoutMilliSeconds;
Date expiredDate = new Date(expiredEpoch);
/*
* Waiting for completion of cache copy.
*/
while (st == ObjectInDataStoreStateMachine.State.Allocated ||
st == ObjectInDataStoreStateMachine.State.Creating ||
st == ObjectInDataStoreStateMachine.State.Copying) {
/*
* Threads must release lock within waiting for cache copy and
* must be waken up at completion.
*/
logger.debug("waiting cache copy completion type: " + typeName + ", id: " + obj.getObjectId() + ", lock: " + lock.hashCode());
try {
lock.wait(milliSeconds);
} catch (InterruptedException e) {
logger.debug("[ignored] interrupted while waiting for cache copy completion.");
}
logger.debug("waken up");
now = new Date();
if (now.after(expiredDate)) {
String msg = "Waiting time exceeds timeout limit(" + timeoutSeconds + " s)";
throw new CloudRuntimeException(msg);
}
obj = objectInStoreMgr.findObject(data, store);
st = obj.getState();
}
if (st == ObjectInDataStoreStateMachine.State.Ready) {
logger.debug("there is already one in the cache store");
DataObject dataObj = objectInStoreMgr.get(data, store, null);
dataObj.incRefCount();
existingDataObj = dataObj;
}
}
if(existingDataObj == null) {
logger.debug("create " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")");
objOnCacheStore = store.create(data);
}
lock.notifyAll();
}
if (existingDataObj != null) {
return existingDataObj;
}
if (objOnCacheStore == null) {
logger.error("create " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ") failed");
return null;
}
AsyncCallFuture<CopyCommandResult> future = new AsyncCallFuture<CopyCommandResult>();
CopyCommandResult result = null;
try {
objOnCacheStore.processEvent(Event.CreateOnlyRequested);
dataMotionSvr.copyAsync(data, objOnCacheStore, future);
result = future.get();
if (result.isFailed()) {
objOnCacheStore.processEvent(Event.OperationFailed);
} else {
objOnCacheStore.processEvent(Event.OperationSuccessed, result.getAnswer());
objOnCacheStore.incRefCount();
return objOnCacheStore;
}
} catch (InterruptedException e) {
logger.debug("create cache storage failed: " + e.toString());
throw new CloudRuntimeException(e);
} catch (ExecutionException e) {
logger.debug("create cache storage failed: " + e.toString());
throw new CloudRuntimeException(e);
} finally {
if (result == null) {
objOnCacheStore.processEvent(Event.OperationFailed);
}
synchronized (lock) {
/*
* Wake up all threads waiting for cache copy.
*/
logger.debug("wake up all waiting threads(lock: " + lock.hashCode() + ")");
lock.notifyAll();
}
}
return null;
}
@Override
public DataObject createCacheObject(DataObject data, Scope scope) {
DataStore cacheStore = getCacheStorage(scope);
if (cacheStore == null) {
String errMsg = "No cache DataStore in scope id " + scope.getScopeId() + " type " + scope.getScopeType().toString();
throw new CloudRuntimeException(errMsg);
}
return this.createCacheObject(data, cacheStore);
}
@Override
public DataObject getCacheObject(DataObject data, Scope scope) {
DataStore cacheStore = getCacheStorage(scope);
DataObject objOnCacheStore = cacheStore.create(data);
objOnCacheStore.incRefCount();
return objOnCacheStore;
}
@Override
public boolean releaseCacheObject(DataObject data) {
data.decRefCount();
return true;
}
@Override
public boolean deleteCacheObject(DataObject data) {
return data.getDataStore().delete(data);
}
}