blob: 576ad9e55288e5a4230926cf68cd23ae095433c1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import cPickle
import logging
import os
import threading
import random
import select
import signal
import socket
import subprocess
import sys
import time
# for scratch space support
from os import system
from tashi.rpycservices.rpyctypes import *
from tashi.util import broken, logged, scrubString, boolean
from tashi import version, stringPartition
from vmcontrolinterface import VmControlInterface
log = logging.getLogger(__file__)
def controlConsole(child, port):
"""This exposes a TCP port that connects to a particular child's monitor -- used for debugging"""
#print "controlConsole"
listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listenSocket.bind(("0.0.0.0", port))
#print "bound"
try:
try:
listenSocket.listen(5)
ls = listenSocket.fileno()
input = child.monitorFd
output = child.monitorFd
#print "listen"
select.select([ls], [], [])
(s, clientAddr) = listenSocket.accept()
while s:
if (output != -1):
(rl, wl, el) = select.select([s, output], [], [])
else:
(rl, wl, el) = select.select([s], [], [])
if (len(rl) > 0):
if (rl[0] == s):
#print "from s"
buf = s.recv(4096)
if (buf == ""):
s.close()
listenSocket.close()
s = None
continue
if (output != -1):
os.write(child.monitorFd, buf)
elif (rl[0] == output):
#print "from output"
buf = os.read(output, 4096)
#print "read complete"
if (buf == ""):
output = -1
else:
s.send(buf)
except:
s.close()
listenSocket.close()
finally:
#print "Thread exiting"
pass
class Qemu(VmControlInterface):
"""This class implements the VmControlInterface for Qemu/KVM"""
def __init__(self, config, dfs, nm):
VmControlInterface.__init__(self, config, dfs, nm)
self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
self.INFO_DIR = self.config.get("Qemu", "infoDir")
self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
self.controlledVMs = {}
self.usedPorts = []
self.usedPortsLock = threading.Lock()
self.vncPorts = []
self.vncPortLock = threading.Lock()
self.consolePort = 10000
self.consolePortLock = threading.Lock()
self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
self.stats = {}
self.scratchVg = self.config.get("Qemu", "scratchVg")
# XXXstroucki revise
self.scratchDir = self.config.get("Qemu", "scratchDir")
if len(self.scratchDir) == 0:
self.scratchDir = "/tmp"
try:
os.mkdir(self.INFO_DIR)
except:
pass
self.scanInfoDir()
threading.Thread(target=self.pollVMsLoop).start()
if (self.statsInterval > 0):
threading.Thread(target=self.statsThread).start()
class anonClass:
def __init__(self, **attrs):
self.__dict__.update(attrs)
def getSystemPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
for f in os.listdir("/proc"):
try:
bin = os.readlink("/proc/%s/exe" % (f))
if (bin.find(self.QEMU_BIN) != -1):
pids.append(int(f))
except Exception:
pass
return pids
def getInstances(self):
"""Will return a dict of instances by vmId to the caller"""
return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
def setInstance(self, instance):
"""Sets data in an instance object"""
vmId = instance.vmId
child = self.getChildFromPid(vmId)
child.instance = instance
self.saveChildInfo(child)
def matchSystemPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
if self.nm is None:
#XXXstroucki log may not be there yet either
#self.log.info("NM hook not yet available")
return
vmIds = controlledVMs.keys()
pids = self.getSystemPids()
for vmId in vmIds:
child = controlledVMs[vmId]
if vmId not in pids:
os.unlink(self.INFO_DIR + "/%d"%(vmId))
del controlledVMs[vmId]
try:
del self.stats[vmId]
except:
pass
if (child.vncPort >= 0):
self.vncPortLock.acquire()
self.vncPorts.remove(child.vncPort)
self.vncPortLock.release()
log.info("Removing vmId %d" % (vmId))
if (child.OSchild):
try:
os.waitpid(vmId, 0)
except:
log.exception("waitpid failed")
if (child.errorBit):
if (child.OSchild):
f = open("/tmp/%d.err" % (vmId), "w")
f.write(child.stderr.read())
f.close()
f = open("/tmp/%d.pty" % (vmId), "w")
for i in child.monitorHistory:
f.write(i)
f.close()
#XXXstroucki remove scratch storage
try:
if self.scratchVg is not None:
scratch_name = child.instance.name
log.info("Removing scratch for " + scratch_name)
cmd = "/sbin/lvremove -f %s" % self.scratchVg
result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
except:
pass
try:
if (not child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.Exited)
except Exception, e:
log.exception("vmStateChange failed")
else:
try:
if (child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
else:
self.nm.vmStateChange(vmId, None, InstanceState.Running)
except:
#XXXstroucki nm is initialised at different time
log.exception("vmStateChange failed")
def scanInfoDir(self):
"""This is not thread-safe and must only be used during class initialization"""
controlledVMs = {}
controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
if (len(controlledVMs) == 0):
log.info("No vm information found in %s", self.INFO_DIR)
for vmId in controlledVMs:
try:
child = self.loadChildInfo(vmId)
self.vncPortLock.acquire()
if (child.vncPort >= 0):
self.vncPorts.append(child.vncPort)
self.vncPortLock.release()
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
#XXXstroucki ensure instance has vmId
child.instance.vmId = vmId
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
except Exception, e:
log.exception("Failed to load VM info for %d", vmId)
else:
log.info("Loaded VM info for %d", vmId)
# XXXstroucki NM may not be available yet here.
self.matchSystemPids(self.controlledVMs)
def pollVMsLoop(self):
"""Infinite loop that checks for dead VMs"""
while True:
try:
time.sleep(self.POLL_DELAY)
self.matchSystemPids(self.controlledVMs)
except:
log.exception("Exception in poolVMsLoop")
def waitForExit(self, vmId):
"""This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
while vmId in self.controlledVMs:
time.sleep(self.POLL_DELAY)
def getChildFromPid(self, pid):
"""Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
child = self.controlledVMs.get(pid, None)
if (not child):
raise Exception, "Uncontrolled vmId %d" % (pid)
return child
def consumeAvailable(self, child):
"""Consume characters one-by-one until they stop coming"""
monitorFd = child.monitorFd
buf = ""
try:
(rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
while (len(rlist) > 0):
c = os.read(monitorFd, 1)
if (c == ""):
log.error("Early termination on monitor for vmId %d" % (child.pid))
child.errorBit = True
raise RuntimeError
buf = buf + c
(rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
finally:
child.monitorHistory.append(buf)
return buf
def consumeUntil(self, child, needle, timeout = -1):
"""Consume characters one-by-one until something specific comes up"""
if (timeout == -1):
timeout = self.monitorTimeout
monitorFd = child.monitorFd
buf = " " * len(needle)
try:
while (buf[-(len(needle)):] != needle):
#print "[BUF]: %s" % (buf)
#print "[NEE]: %s" % (needle)
(rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
if (len(rlist) == 0):
log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
child.errorBit = True
raise RuntimeError
c = os.read(monitorFd, 1)
if (c == ""):
log.error("Early termination on monitor for vmId %d" % (child.pid))
child.errorBit = True
raise RuntimeError
buf = buf + c
finally:
child.monitorHistory.append(buf[len(needle):])
return buf[len(needle):]
def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
"""Enter a command on the qemu monitor"""
res = self.consumeAvailable(child)
os.write(child.monitorFd, command + "\n")
if (expectPrompt):
self.consumeUntil(child, command, timeout=timeout)
res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
return res
def loadChildInfo(self, vmId):
child = self.anonClass(pid=vmId)
info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
(instance, pid, ptyFile) = cPickle.load(info)
info.close()
if (pid != child.pid):
raise Exception, "PID mismatch"
child.instance = instance
child.pid = pid
child.ptyFile = ptyFile
if ('monitorHistory' not in child.__dict__):
child.monitorHistory = []
if ('OSchild' not in child.__dict__):
child.OSchild = False
if ('errorBit' not in child.__dict__):
child.errorBit = False
if ('migratingOut' not in child.__dict__):
child.migratingOut = False
if ('vncPort' not in child.__dict__):
child.vncPort = -1
return child
def saveChildInfo(self, child):
info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
cPickle.dump((child.instance, child.pid, child.ptyFile), info)
info.close()
def getHostInfo(self, service):
host = Host()
host.id = service.id
host.name = socket.gethostname()
cmd = "head -n 1 /proc/meminfo"
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
# XXXstroucki should have parameter for reserved mem
host.memory = (int(memoryStr[1])/1024) - 512
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
host.cores = os.sysconf("SC_NPROCESSORS_ONLN")
host.up = True
host.decayed = False
host.version = version
return host
def startVm(self, instance, source):
"""Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
# Capture startVm Hints
# CPU hints
cpuModel = instance.hints.get("cpumodel")
cpuString = ""
if cpuModel:
cpuString = "-cpu " + cpuModel
# Clock hints
clockString = instance.hints.get("clock", "dynticks")
# Disk hints
diskInterface = instance.hints.get("diskInterface", "ide")
diskString = ""
for index in range(0, len(instance.disks)):
disk = instance.disks[index]
uri = scrubString(disk.uri)
imageLocal = self.dfs.getLocalHandle("images/" + uri)
thisDiskList = [ "file=%s" % imageLocal ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
if (index == 0 and diskInterface == "virtio"):
thisDiskList.append("boot=on")
if (disk.persistent):
snapshot = "off"
migrate = "off"
else:
snapshot = "on"
migrate = "on"
thisDiskList.append("cache=off")
thisDiskList.append("snapshot=%s" % snapshot)
if (self.useMigrateArgument):
thisDiskList.append("migrate=%s" % migrate)
diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
# scratch disk (should be integrated better)
scratchSize = instance.hints.get("scratchSpace", "0")
scratchSize = int(scratchSize)
scratch_file = None
try:
if scratchSize > 0:
if self.scratchVg is None:
raise Exception, "No scratch volume group defined"
# create scratch disk
# XXXstroucki: needs to be cleaned somewhere
# XXXstroucki: clean user provided instance name
scratch_name = "lv" + instance.name
# XXXstroucki hold lock
# XXXstroucki check for capacity
cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
index += 1
thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
thisDiskList.append("cache=off")
if (True or disk.persistent):
snapshot = "off"
migrate = "off"
else:
snapshot = "on"
migrate = "on"
thisDiskList.append("snapshot=%s" % snapshot)
if (self.useMigrateArgument):
thisDiskList.append("migrate=%s" % migrate)
diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
except:
print 'caught exception'
raise 'exception'
# Nic hints
nicModel = instance.hints.get("nicModel", "virtio")
nicString = ""
for i in range(0, len(instance.nics)):
nic = instance.nics[i]
nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network)
# ACPI
if (boolean(instance.hints.get("noAcpi", False))):
noAcpiString = "-no-acpi"
else:
noAcpiString = ""
# Construct the qemu command
strCmd = "%s %s %s -clock %s %s %s -m %d -smp %d -serial null -vnc none -monitor pty" % (self.QEMU_BIN, noAcpiString, cpuString, clockString, diskString, nicString, instance.memory, instance.cores)
cmd = strCmd.split()
if (source):
cmd = cmd + ["-incoming", source]
strCmd = strCmd + " -incoming %s" % (source)
log.info("QEMU command: %s" % (strCmd))
(pipe_r, pipe_w) = os.pipe()
pid = os.fork()
if (pid == 0):
pid = os.getpid()
os.setpgid(pid, pid)
os.close(pipe_r)
os.dup2(pipe_w, sys.stderr.fileno())
for i in [sys.stdin.fileno(), sys.stdout.fileno()]:
try:
os.close(i)
except:
pass
for i in xrange(3, os.sysconf("SC_OPEN_MAX")):
try:
os.close(i)
except:
pass
# XXXstroucki unfortunately no kvm option yet
os.environ['TMPDIR'] = self.scratchDir
os.execl(self.QEMU_BIN, *cmd)
sys.exit(-1)
os.close(pipe_w)
child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
child.ptyFile = None
child.vncPort = -1
child.instance.vmId = child.pid
self.saveChildInfo(child)
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
return (child.pid, cmd)
def getPtyInfo(self, child, issueContinue):
ptyFile = None
while not ptyFile:
l = child.stderr.readline()
if (l == ""):
try:
os.waitpid(child.pid, 0)
except:
log.exception("waitpid failed")
raise Exception, "Failed to start VM -- ptyFile not found"
if (l.find("char device redirected to ") != -1):
ptyFile=l[26:].strip()
break
child.ptyFile = ptyFile
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
self.saveChildInfo(child)
if (issueContinue):
#XXXstroucki: receiving a vm can take a long time
self.enterCommand(child, "c", timeout=None)
def stopVm(self, vmId, target, stopFirst):
"""Universal function to stop a VM -- used by suspendVM, migrateVM """
child = self.getChildFromPid(vmId)
if (stopFirst):
self.enterCommand(child, "stop")
if (target):
retry = self.migrationRetries
while (retry > 0):
res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
if (res.find("migration failed") == -1):
retry = -1
else:
log.error("Migration (transiently) failed: %s\n", res)
if (retry == 0):
log.error("Migration failed: %s\n", res)
child.errorBit = True
raise RuntimeError
self.enterCommand(child, "quit", expectPrompt=False)
return vmId
def instantiateVm(self, instance):
(vmId, cmd) = self.startVm(instance, None)
child = self.getChildFromPid(vmId)
self.getPtyInfo(child, False)
child.cmd = cmd
self.saveChildInfo(child)
return vmId
def suspendVm(self, vmId, target):
child = self.getChildFromPid(vmId)
tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
# XXX: Use fifo to improve performance
vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
self.dfs.copyTo(tmpTarget, target)
return vmId
def resumeVmHelper(self, instance, source):
child = self.getChildFromPid(instance.vmId)
try:
self.getPtyInfo(child, True)
except RuntimeError, e:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
status = "paused"
while ("running" not in status):
status = self.enterCommand(child, "info status")
time.sleep(1)
def resumeVm(self, instance, source):
fn = self.dfs.getLocalHandle("%s" % (source))
(vmId, cmd) = self.startVm(instance, "exec:zcat %s" % (fn))
child = self.getChildFromPid(vmId)
child.cmd = cmd
self.saveChildInfo(child)
return vmId
def prepReceiveVm(self, instance, source):
self.usedPortsLock.acquire()
port = int(random.random()*1000+19000)
while port in self.usedPorts:
port = int(random.random()*1000+19000)
self.usedPorts.append(port)
self.usedPortsLock.release()
(vmId, cmd) = self.startVm(instance, "tcp:0.0.0.0:%d" % (port))
transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
child = self.getChildFromPid(vmId)
child.cmd = cmd
child.transportCookie = transportCookie
self.saveChildInfo(child)
# XXX: Cleanly wait until the port is open
lc = 0
while (lc < 1):
# XXXpipe: find whether something is listening yet on the port
(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
stdin.close()
r = stdout.read()
lc = int(r.strip())
if (lc < 1):
time.sleep(1.0)
return transportCookie
def migrateVm(self, vmId, target, transportCookie):
self.migrationSemaphore.acquire()
try:
(port, _vmId, _hostname) = cPickle.loads(transportCookie)
child = self.getChildFromPid(vmId)
child.migratingOut = True
res = self.stopVm(vmId, "tcp:%s:%d" % (target, port), False)
# XXX: Some sort of feedback would be nice
# XXX: Should we block?
self.waitForExit(vmId)
finally:
self.migrationSemaphore.release()
return res
def receiveVm(self, transportCookie):
(port, vmId, _hostname) = cPickle.loads(transportCookie)
try:
child = self.getChildFromPid(vmId)
except:
log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
raise
try:
self.getPtyInfo(child, True)
except RuntimeError, e:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
self.usedPortsLock.acquire()
self.usedPorts = filter(lambda _port: _port != port, self.usedPorts)
self.usedPortsLock.release()
return vmId
def pauseVm(self, vmId):
child = self.getChildFromPid(vmId)
self.enterCommand(child, "stop")
def unpauseVm(self, vmId):
child = self.getChildFromPid(vmId)
self.enterCommand(child, "c")
def shutdownVm(self, vmId):
"""'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM with some versions of Linux"""
child = self.getChildFromPid(vmId)
self.enterCommand(child, "system_powerdown")
def destroyVm(self, vmId):
child = self.getChildFromPid(vmId)
child.migratingOut = False
# XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
os.kill(child.pid, signal.SIGKILL)
def vmmSpecificCall(self, vmId, arg):
arg = arg.lower()
if (arg == "startvnc"):
child = self.getChildFromPid(vmId)
hostname = socket.gethostname()
if (child.vncPort == -1):
self.vncPortLock.acquire()
port = 0
while (port in self.vncPorts):
port = port + 1
self.vncPorts.append(port)
self.vncPortLock.release()
self.enterCommand(child, "change vnc :%d" % (port))
child.vncPort = port
self.saveChildInfo(child)
port = child.vncPort
return "VNC started on %s:%d" % (hostname, port+5900)
elif (arg == "stopvnc"):
child = self.getChildFromPid(vmId)
self.enterCommand(child, "change vnc none")
if (child.vncPort != -1):
self.vncPortLock.acquire()
self.vncPorts.remove(child.vncPort)
self.vncPortLock.release()
child.vncPort = -1
self.saveChildInfo(child)
return "VNC halted"
elif (arg.startswith("changecdrom:")):
child = self.getChildFromPid(vmId)
iso = scrubString(arg[12:])
imageLocal = self.dfs.getLocalHandle("images/" + iso)
self.enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
return "Changed ide1-cd0 to %s" % (iso)
elif (arg == "startconsole"):
child = self.getChildFromPid(vmId)
hostname = socket.gethostname()
self.consolePortLock.acquire()
consolePort = self.consolePort
self.consolePort = self.consolePort+1
self.consolePortLock.release()
threading.Thread(target=controlConsole, args=(child,consolePort)).start()
return "Control console listenting on %s:%d" % (hostname, consolePort)
elif (arg == "list"):
return "startVnc\nstopVnc\nchangeCdrom:<image.iso>\nstartConsole"
else:
return "Unknown arg %s" % (arg)
def listVms(self):
return self.controlledVMs.keys()
def statsThread(self):
ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
netStats = {}
cpuStats = {}
last = time.time() - self.statsInterval
while True:
now = time.time()
try:
f = open("/proc/net/dev")
netData = f.readlines()
f.close()
for l in netData:
if (l.find("tashi") != -1):
(dev, sep, ld) = stringPartition(l, ":")
dev = dev.strip()
ws = ld.split()
recvBytes = float(ws[0])
sendBytes = float(ws[8])
(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
if (lastRecvBytes > 2**32):
lastRecvBytes = lastRecvBytes - 2**64
else:
lastRecvBytes = lastRecvBytes - 2**32
if (sendBytes < lastSendBytes):
if (lastSendBytes > 2**32):
lastSendBytes = lastSendBytes - 2**64
else:
lastSendBytes = lastSendBytes - 2**32
recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
for vmId in self.controlledVMs:
f = open("/proc/%d/stat" % (vmId))
procData = f.read()
f.close()
ws = procData.strip().split()
userTicks = float(ws[13])
sysTicks = float(ws[14])
myTicks = userTicks + sysTicks
vsize = (int(ws[22]))/1024.0/1024.0
rss = (int(ws[23])*4096)/1024.0/1024.0
cpuSeconds = myTicks/ticksPerSecond
lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
cpuStats[vmId] = cpuSeconds
child = self.controlledVMs[vmId]
(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
for i in range(0, len(child.instance.nics)):
netDev = "tashi%d.%d" % (child.instance.id, i)
(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
self.stats[vmId] = self.stats.get(vmId, {})
child = self.controlledVMs.get(vmId, None)
if (child):
res = self.enterCommand(child, "info blockstats")
for l in res.split("\n"):
(device, sep, data) = stringPartition(l, ": ")
if (data != ""):
for field in data.split(" "):
(label, sep, val) = stringPartition(field, "=")
if (val != ""):
self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
self.stats[vmId]['%s_%s' % (device, label)] = int(val)
self.stats[vmId]['cpuLoad'] = cpuLoad
self.stats[vmId]['rss'] = rss
self.stats[vmId]['recvMBs'] = sendMBs
self.stats[vmId]['sendMBs'] = recvMBs
except:
log.exception("statsThread threw an exception")
last = now
time.sleep(self.statsInterval)
def getStats(self, vmId):
return self.stats.get(vmId, {})