remove old branch stablefix
git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/cmu@1203836 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/tashi/agents/dhcpdns.py b/src/tashi/agents/dhcpdns.py
index d54f80f..a1741e1 100644
--- a/src/tashi/agents/dhcpdns.py
+++ b/src/tashi/agents/dhcpdns.py
@@ -28,7 +28,8 @@
class DhcpDns(InstanceHook):
def __init__(self, config, client, post=False):
InstanceHook.__init__(self, config, client, post)
- self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
+ self.dnsKeyName = self.config.get('DhcpDns', 'dnsKeyName')
+ self.dnsSecretKey = self.config.get('DhcpDns', 'dnsSecretKey')
self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
self.dnsDomain = self.config.get('DhcpDns', 'dnsDomain')
self.dnsExpire = int(self.config.get('DhcpDns', 'dnsExpire'))
@@ -153,14 +154,12 @@
self.removeDns(name)
except:
pass
- if (self.dnsKeyFile != ""):
- cmd = "nsupdate -k %s" % (self.dnsKeyFile)
- else:
- cmd = "nsupdate"
+ cmd = "nsupdate"
child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
try:
(stdin, stdout) = (child.stdin, child.stdout)
stdin.write("server %s\n" % (self.dnsServer))
+ stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
stdin.write("update add %s.%s %d A %s\n" % (name, self.dnsDomain, self.dnsExpire, ip))
stdin.write("\n")
if (self.reverseDns):
@@ -181,14 +180,12 @@
(pid, status) = os.waitpid(child.pid, os.WNOHANG)
def removeDns(self, name):
- if (self.dnsKeyFile != ""):
- cmd = "nsupdate -k %s" % (self.dnsKeyFile)
- else:
- cmd = "nsupdate"
+ cmd = "nsupdate"
child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
try:
(stdin, stdout) = (child.stdin, child.stdout)
stdin.write("server %s\n" % (self.dnsServer))
+ stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
if (self.reverseDns):
ip = socket.gethostbyname(name)
ipSegments = map(int, ip.split("."))
diff --git a/src/tashi/agents/locality-server.py b/src/tashi/agents/locality-server.py
index 9a5ca21..ac835ed 100755
--- a/src/tashi/agents/locality-server.py
+++ b/src/tashi/agents/locality-server.py
@@ -1,5 +1,4 @@
#!/usr/bin/python
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
diff --git a/src/tashi/agents/primitive.py b/src/tashi/agents/primitive.py
index b86624f..189a3bc 100755
--- a/src/tashi/agents/primitive.py
+++ b/src/tashi/agents/primitive.py
@@ -146,6 +146,11 @@
# XXXstroucki if it's down, find another machine
if (h.up == False):
continue
+
+ # If the host not in normal operating state,
+ # find another machine
+ if (h.state != HostState.Normal):
+ continue
# if it's reserved, see if we can use it
if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
diff --git a/src/tashi/clustermanager/clustermanager.py b/src/tashi/clustermanager/clustermanager.py
index 8761af1..379fe29 100755
--- a/src/tashi/clustermanager/clustermanager.py
+++ b/src/tashi/clustermanager/clustermanager.py
@@ -30,7 +30,7 @@
from tashi.rpycservices import rpycservices
from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
def startClusterManager(config):
global service, data
@@ -47,7 +47,7 @@
users[user.name] = user.passwd
users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
- authenticator = VdbAuthenticator.from_dict(users)
+ authenticator = TlsliteVdbAuthenticator.from_dict(users)
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
diff --git a/src/tashi/clustermanager/clustermanagerservice.py b/src/tashi/clustermanager/clustermanagerservice.py
index c009879..55dac62 100644
--- a/src/tashi/clustermanager/clustermanagerservice.py
+++ b/src/tashi/clustermanager/clustermanagerservice.py
@@ -72,11 +72,19 @@
threading.Thread(target=self.monitorCluster).start()
def stateTransition(self, instance, old, cur):
- if cur == InstanceState.Running:
- self.log.exception("%d was made running here" % instance.id)
if (old and instance.state != old):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
+ if (instance.state == cur):
+ return
+
instance.state = cur
+ try:
+ host = self.data.getHost(instance.hostId)
+ vmId = instance.vmId
+ self.proxy[host.name].vmStateChange(vmId, old, cur)
+ except:
+ #XXXstroucki append to a list?
+ pass
def __now(self):
return time.time()
@@ -341,6 +349,8 @@
except:
self.data.releaseInstance(instance)
raise
+ from pprint import pformat
+ self.log.info("A: %s" % (pformat(instance)))
self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
self.data.releaseInstance(instance)
try:
@@ -349,11 +359,14 @@
self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
+ self.log.info("Debug: here")
except Exception, e:
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
+ self.log.info("B: %s" % (pformat(instance)))
self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ self.log.info("C: %s" % (pformat(instance)))
self.data.releaseInstance(instance)
try:
# Send the VM
@@ -370,6 +383,7 @@
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
+ self.log.info("D: %s notified" % targetHost.name)
except Exception, e:
self.log.exception('receiveVm failed')
raise
diff --git a/src/tashi/nodemanager/nodemanager.py b/src/tashi/nodemanager/nodemanager.py
index 959624c..166262d 100755
--- a/src/tashi/nodemanager/nodemanager.py
+++ b/src/tashi/nodemanager/nodemanager.py
@@ -28,7 +28,7 @@
from tashi.rpycservices import rpycservices
from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
@@ -51,7 +51,7 @@
if boolean(config.get("Security", "authAndEncrypt")):
users = {}
users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
- authenticator = VdbAuthenticator.from_dict(users)
+ authenticator = TlsliteVdbAuthenticator.from_dict(users)
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
diff --git a/src/tashi/nodemanager/nodemanagerservice.py b/src/tashi/nodemanager/nodemanagerservice.py
index 099bb2d..64cf17d 100755
--- a/src/tashi/nodemanager/nodemanagerservice.py
+++ b/src/tashi/nodemanager/nodemanagerservice.py
@@ -91,17 +91,19 @@
self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
if (cur == InstanceState.Exited):
del self.instances[vmId]
+ return True
+
+ if (instance.state == cur):
+ # don't do anything if state is what it should be
+ return True
+
instance.state = cur
newInst = Instance(d={'state':cur})
success = lambda: None
- try:
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cm.vmUpdate(instance.id, newInst, old)
- except Exception, e:
- self.log.exception('RPC failed for vmUpdate on CM')
- self.notifyCM.append((instance.id, newInst, old, success))
- else:
- success()
+ self.notifyCM.append((instance.id, newInst, old, success))
+ self.setInstance(instance)
+
+ success()
return True
def backupVmInfoAndFlushNotifyCM(self):
@@ -179,6 +181,9 @@
if (instance is None):
raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
return instance
+
+ def setInstance(self, instance):
+ self.vmm.setInstance(instance)
def instantiateVm(self, instance):
vmId = self.vmm.instantiateVm(instance)
@@ -245,6 +250,8 @@
instance.vmId = vmId
self.instances[vmId] = instance
newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
+ import pprint
+ self.log.info("Debug: %s" % (pprint.pformat(newInstance)))
success = lambda: None
try:
cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
diff --git a/src/tashi/nodemanager/vmcontrol/qemu.py b/src/tashi/nodemanager/vmcontrol/qemu.py
index 97b5bb6..576ad9e 100644
--- a/src/tashi/nodemanager/vmcontrol/qemu.py
+++ b/src/tashi/nodemanager/vmcontrol/qemu.py
@@ -140,6 +140,13 @@
"""Will return a dict of instances by vmId to the caller"""
return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
+ def setInstance(self, instance):
+ """Sets data in an instance object"""
+ vmId = instance.vmId
+ child = self.getChildFromPid(vmId)
+ child.instance = instance
+ self.saveChildInfo(child)
+
def matchSystemPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
if self.nm is None:
@@ -301,7 +308,7 @@
res = self.consumeAvailable(child)
os.write(child.monitorFd, command + "\n")
if (expectPrompt):
- self.consumeUntil(child, command)
+ self.consumeUntil(child, command, timeout=timeout)
res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
return res
@@ -339,8 +346,8 @@
cmd = "head -n 1 /proc/meminfo"
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
- # XXXstroucki should have parameter for reserved mem
- host.memory = (int(memoryStr[1])/1024) - 512
+ # XXXstroucki should have parameter for reserved mem
+ host.memory = (int(memoryStr[1])/1024) - 512
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
@@ -505,7 +512,8 @@
child.monitor = os.fdopen(child.monitorFd)
self.saveChildInfo(child)
if (issueContinue):
- self.enterCommand(child, "c")
+ #XXXstroucki: receiving a vm can take a long time
+ self.enterCommand(child, "c", timeout=None)
def stopVm(self, vmId, target, stopFirst):
"""Universal function to stop a VM -- used by suspendVM, migrateVM """
@@ -515,7 +523,7 @@
if (target):
retry = self.migrationRetries
while (retry > 0):
- res = self.enterCommand(child, "migrate %s" % (target), timeout=self.migrateTimeout)
+ res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
if (res.find("migration failed") == -1):
retry = -1
@@ -562,6 +570,7 @@
(vmId, cmd) = self.startVm(instance, "exec:zcat %s" % (fn))
child = self.getChildFromPid(vmId)
child.cmd = cmd
+ self.saveChildInfo(child)
return vmId
def prepReceiveVm(self, instance, source):
diff --git a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
index cd4fde8..89ac3ff 100644
--- a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
+++ b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
@@ -31,6 +31,10 @@
def getInstances(self):
"""Will return a dict of instances by vmId to the caller"""
raise NotImplementedError
+
+ def setInstance(self, instance):
+ """Sets data in an instance object"""
+ raise NotImplementedError
def instantiateVm(self, instance):
"""Takes an InstanceConfiguration, creates a VM based on it,