blob: 3c16644796eb6ff48d49f9ee965b1a962dfdd4b4 [file] [log] [blame]
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.server;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.GetFileStatsCommand;
import com.cloud.agent.api.GetStorageStatsCommand;
import com.cloud.agent.api.HostStatsEntry;
import com.cloud.agent.api.VmStatsEntry;
import com.cloud.agent.manager.Commands;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Host;
import com.cloud.host.HostStats;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.storage.StorageManager;
import com.cloud.storage.StoragePoolHostVO;
import com.cloud.storage.StoragePoolVO;
import com.cloud.storage.StorageStats;
import com.cloud.storage.VolumeStats;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.StoragePoolDao;
import com.cloud.storage.dao.StoragePoolHostDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.vm.UserVmManager;
import com.cloud.vm.UserVmVO;
import com.cloud.vm.VmStats;
import com.cloud.vm.dao.UserVmDao;
/**
* Provides real time stats for various agent resources up to x seconds
*
* @author Will Chan
*
*/
public class StatsCollector {
public static final Logger s_logger = Logger.getLogger(StatsCollector.class.getName());
private static StatsCollector s_instance = null;
private ScheduledExecutorService _executor = null;
private final AgentManager _agentMgr;
private final UserVmManager _userVmMgr;
private final HostDao _hostDao;
private final UserVmDao _userVmDao;
private final VolumeDao _volsDao;
private final CapacityDao _capacityDao;
private final StoragePoolDao _storagePoolDao;
private final StorageManager _storageManager;
private final StoragePoolHostDao _storagePoolHostDao;
private ConcurrentHashMap<Long, HostStats> _hostStats = new ConcurrentHashMap<Long, HostStats>();
private final ConcurrentHashMap<Long, VmStats> _VmStats = new ConcurrentHashMap<Long, VmStats>();
private ConcurrentHashMap<Long, VolumeStats> _volumeStats = new ConcurrentHashMap<Long, VolumeStats>();
private ConcurrentHashMap<Long, StorageStats> _storageStats = new ConcurrentHashMap<Long, StorageStats>();
private ConcurrentHashMap<Long, StorageStats> _storagePoolStats = new ConcurrentHashMap<Long, StorageStats>();
long hostStatsInterval = -1L;
long hostAndVmStatsInterval = -1L;
long storageStatsInterval = -1L;
long volumeStatsInterval = -1L;
//private final GlobalLock m_capacityCheckLock = GlobalLock.getInternLock("capacity.check");
public static StatsCollector getInstance() {
return s_instance;
}
public static StatsCollector getInstance(Map<String, String> configs) {
if (s_instance == null) {
s_instance = new StatsCollector(configs);
}
return s_instance;
}
private StatsCollector(Map<String, String> configs) {
ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
_agentMgr = locator.getManager(AgentManager.class);
_userVmMgr = locator.getManager(UserVmManager.class);
_hostDao = locator.getDao(HostDao.class);
_userVmDao = locator.getDao(UserVmDao.class);
_volsDao = locator.getDao(VolumeDao.class);
_capacityDao = locator.getDao(CapacityDao.class);
_storagePoolDao = locator.getDao(StoragePoolDao.class);
_storageManager = locator.getManager(StorageManager.class);
_storagePoolHostDao = locator.getDao(StoragePoolHostDao.class);
_executor = Executors.newScheduledThreadPool(3, new NamedThreadFactory("StatsCollector"));
hostStatsInterval = NumbersUtil.parseLong(configs.get("host.stats.interval"), 60000L);
hostAndVmStatsInterval = NumbersUtil.parseLong(configs.get("vm.stats.interval"), 60000L);
storageStatsInterval = NumbersUtil.parseLong(configs.get("storage.stats.interval"), 60000L);
volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), -1L);
_executor.scheduleWithFixedDelay(new HostCollector(), 15000L, hostStatsInterval, TimeUnit.MILLISECONDS);
_executor.scheduleWithFixedDelay(new VmStatsCollector(), 15000L, hostAndVmStatsInterval, TimeUnit.MILLISECONDS);
_executor.scheduleWithFixedDelay(new StorageCollector(), 15000L, storageStatsInterval, TimeUnit.MILLISECONDS);
// -1 means we don't even start this thread to pick up any data.
if (volumeStatsInterval > 0) {
_executor.scheduleWithFixedDelay(new VolumeCollector(), 15000L, volumeStatsInterval, TimeUnit.MILLISECONDS);
} else {
s_logger.info("Disabling volume stats collector");
}
}
class HostCollector implements Runnable {
@Override
public void run() {
try {
s_logger.debug("HostStatsCollector is running...");
SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString());
ConcurrentHashMap<Long, HostStats> hostStats = new ConcurrentHashMap<Long, HostStats>();
List<HostVO> hosts = _hostDao.search(sc, null);
for (HostVO host : hosts)
{
HostStatsEntry stats = (HostStatsEntry) _agentMgr.getHostStatistics(host.getId());
if (stats != null)
{
hostStats.put(host.getId(), stats);
}
else
{
s_logger.warn("Received invalid host stats for host: " + host.getId());
}
}
_hostStats = hostStats;
}
catch (Throwable t)
{
s_logger.error("Error trying to retrieve host stats", t);
}
}
}
class VmStatsCollector implements Runnable {
@Override
public void run() {
try {
s_logger.debug("VmStatsCollector is running...");
SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString());
sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString());
List<HostVO> hosts = _hostDao.search(sc, null);
for (HostVO host : hosts) {
List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
List<Long> vmIds = new ArrayList<Long>();
for (UserVmVO vm : vms) {
vmIds.add(vm.getId());
}
try
{
HashMap<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
if(vmStatsById != null)
{
VmStatsEntry statsInMemory = null;
Set<Long> vmIdSet = vmStatsById.keySet();
for(Long vmId : vmIdSet)
{
VmStatsEntry statsForCurrentIteration = vmStatsById.get(vmId);
statsInMemory = (VmStatsEntry) _VmStats.get(vmId);
if(statsInMemory == null)
{
//no stats exist for this vm, directly persist
_VmStats.put(vmId, statsForCurrentIteration);
}
else
{
//update each field
statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs());
statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs());
_VmStats.put(vmId, statsInMemory);
}
}
}
} catch (Exception e) {
s_logger.debug("Failed to get VM stats for host with ID: " + host.getId());
continue;
}
}
} catch (Throwable t) {
s_logger.error("Error trying to retrieve VM stats", t);
}
}
}
public VmStats getVmStats(long id) {
return _VmStats.get(id);
}
class StorageCollector implements Runnable {
@Override
public void run() {
try {
SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Storage.toString());
ConcurrentHashMap<Long, StorageStats> storageStats = new ConcurrentHashMap<Long, StorageStats>();
List<HostVO> hosts = _hostDao.search(sc, null);
for (HostVO host : hosts) {
GetStorageStatsCommand command = new GetStorageStatsCommand(host.getGuid());
Answer answer = _agentMgr.easySend(host.getId(), command);
if (answer != null && answer.getResult()) {
storageStats.put(host.getId(), (StorageStats)answer);
}
}
sc = _hostDao.createSearchCriteria();
sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage.toString());
hosts = _hostDao.search(sc, null);
for (HostVO host : hosts) {
GetStorageStatsCommand command = new GetStorageStatsCommand(host.getGuid());
Answer answer = _agentMgr.easySend(host.getId(), command);
if (answer != null && answer.getResult()) {
storageStats.put(host.getId(), (StorageStats)answer);
}
}
_storageStats = storageStats;
ConcurrentHashMap<Long, StorageStats> storagePoolStats = new ConcurrentHashMap<Long, StorageStats>();
List<StoragePoolVO> storagePools = _storagePoolDao.listAll();
for (StoragePoolVO pool: storagePools) {
GetStorageStatsCommand command = new GetStorageStatsCommand(pool.getUuid(), pool.getPoolType(), pool.getPath());
Answer answer = _storageManager.sendToPool(pool, command);
if (answer != null && answer.getResult()) {
storagePoolStats.put(pool.getId(), (StorageStats)answer);
}
}
_storagePoolStats = storagePoolStats;
// a list to store the new capacity entries that will be committed once everything is calculated
List<CapacityVO> newCapacities = new ArrayList<CapacityVO>();
// Updating the storage entries and creating new ones if they dont exist.
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
if (s_logger.isTraceEnabled()) {
s_logger.trace("recalculating system storage capacity");
}
txn.start();
for (Long hostId : storageStats.keySet()) {
StorageStats stats = storageStats.get(hostId);
short capacityType = -1;
HostVO host = _hostDao.findById(hostId);
host.setTotalSize(stats.getCapacityBytes());
_hostDao.update(host.getId(), host);
SearchCriteria<CapacityVO> capacitySC = _capacityDao.createSearchCriteria();
capacitySC.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, hostId);
capacitySC.addAnd("dataCenterId", SearchCriteria.Op.EQ, host.getDataCenterId());
if (Host.Type.SecondaryStorage.equals(host.getType())) {
capacityType = CapacityVO.CAPACITY_TYPE_SECONDARY_STORAGE;
} else if (Host.Type.Storage.equals(host.getType())) {
capacityType = CapacityVO.CAPACITY_TYPE_STORAGE;
}
if(-1 != capacityType){
capacitySC.addAnd("capacityType", SearchCriteria.Op.EQ, capacityType);
List<CapacityVO> capacities = _capacityDao.search(capacitySC, null);
if (capacities.size() == 0){ // Create a new one
CapacityVO capacity = new CapacityVO(host.getId(), host.getDataCenterId(), host.getPodId(), stats.getByteUsed(), stats.getCapacityBytes(), capacityType);
_capacityDao.persist(capacity);
}else{ //Update if it already exists.
CapacityVO capacity = capacities.get(0);
capacity.setUsedCapacity(stats.getByteUsed());
capacity.setTotalCapacity(stats.getCapacityBytes());
_capacityDao.update(capacity.getId(), capacity);
}
}
}// End of for
txn.commit();
} catch (Exception ex) {
txn.rollback();
s_logger.error("Unable to start transaction for storage capacity update");
}finally {
txn.close();
}
for (Long poolId : storagePoolStats.keySet()) {
StorageStats stats = storagePoolStats.get(poolId);
StoragePoolVO pool = _storagePoolDao.findById(poolId);
if (pool == null) {
continue;
}
pool.setCapacityBytes(stats.getCapacityBytes());
long available = stats.getCapacityBytes() - stats.getByteUsed();
if( available < 0 ) {
available = 0;
}
pool.setAvailableBytes(available);
_storagePoolDao.update(pool.getId(), pool);
_storageManager.createCapacityEntry(pool, 0L);
}
} catch (Throwable t) {
s_logger.error("Error trying to retrieve storage stats", t);
}
}
}
public StorageStats getStorageStats(long id) {
return _storageStats.get(id);
}
public HostStats getHostStats(long hostId){
return _hostStats.get(hostId);
}
public StorageStats getStoragePoolStats(long id) {
return _storagePoolStats.get(id);
}
class VolumeCollector implements Runnable {
@Override
public void run() {
try {
List<VolumeVO> volumes = _volsDao.listAll();
Map<Long, List<VolumeCommand>> commandsByPool = new HashMap<Long, List<VolumeCommand>>();
for (VolumeVO volume : volumes) {
List<VolumeCommand> commands = commandsByPool.get(volume.getPoolId());
if (commands == null) {
commands = new ArrayList<VolumeCommand>();
commandsByPool.put(volume.getPoolId(), commands);
}
VolumeCommand vCommand = new VolumeCommand();
vCommand.volumeId = volume.getId();
vCommand.command = new GetFileStatsCommand(volume);
commands.add(vCommand);
}
ConcurrentHashMap<Long, VolumeStats> volumeStats = new ConcurrentHashMap<Long, VolumeStats>();
for (Iterator<Long> iter = commandsByPool.keySet().iterator(); iter.hasNext();) {
Long poolId = iter.next();
List<VolumeCommand> commandsList = commandsByPool.get(poolId);
long[] volumeIdArray = new long[commandsList.size()];
Commands commands = new Commands(OnError.Continue);
for (int i = 0; i < commandsList.size(); i++) {
VolumeCommand vCommand = commandsList.get(i);
volumeIdArray[i] = vCommand.volumeId;
commands.addCommand(vCommand.command);
}
List<StoragePoolHostVO> poolhosts = _storagePoolHostDao.listByPoolId(poolId);
for(StoragePoolHostVO poolhost : poolhosts) {
Answer[] answers = _agentMgr.send(poolhost.getHostId(), commands);
if (answers != null) {
long totalBytes = 0L;
for (int i = 0; i < answers.length; i++) {
if (answers[i].getResult()) {
VolumeStats vStats = (VolumeStats)answers[i];
volumeStats.put(volumeIdArray[i], vStats);
totalBytes += vStats.getBytesUsed();
}
}
break;
}
}
}
// We replace the existing volumeStats so that it does not grow with no bounds
_volumeStats = volumeStats;
} catch (AgentUnavailableException e) {
s_logger.debug(e.getMessage());
} catch (Throwable t) {
s_logger.error("Error trying to retrieve volume stats", t);
}
}
}
private class VolumeCommand {
public long volumeId;
public GetFileStatsCommand command;
}
public VolumeStats[] getVolumeStats(long[] ids) {
VolumeStats[] stats = new VolumeStats[ids.length];
if (volumeStatsInterval > 0) {
for (int i = 0; i < ids.length; i++) {
stats[i] = _volumeStats.get(ids[i]);
}
}
return stats;
}
}