Merge stable branch from trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/stable@1241776 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/INSTALL b/INSTALL
new file mode 100644
index 0000000..6240c35
--- /dev/null
+++ b/INSTALL
@@ -0,0 +1,319 @@
+Welcome to Apache Tashi!
+
+This text will show you how to get set up quickly, and then will touch
+on how to configure additional functionality.
+
+The audience for this document is someone with skills in the following areas:
+ * Creating disk images for mass installation
+ * Networking, bridging and bonding
+
+You must be able to properly create and maintain disk images that newly
+created virtual machines can boot from. This includes handling
+installations of hardware drivers necessary to run in the virtualized
+environment provided by Tashi.
+
+You must be able to properly handle connections to an existing network.
+If you do not operate the network the virtual machines are to be
+connected to, you must make arrangements with your network
+administrators for permission to connect to their network, IP address
+blocks, name service, DHCP and any other services necessary. The
+instructions here reflect an environment commonly available within a
+home network, i.e. a home router providing access to name service, NAT
+to access the internet and a DHCP service that will hand out private
+addresses without the need for prior reservations.
+
+The hardware demands for an installation of Tashi are extremely modest,
+but a Tashi cluster can grow to a large size. This installation document
+will first demonstrate a Tashi setup in a virtual machine on a 2007 era
+Macbook Pro.
+
+If you already have an existing set of physical machines, you can choose
+one now to host the cluster manager and the scheduler. You can follow
+the instructions on how to install Tashi on a single machine, then
+continue on with suggestions on how to deploy additional nodes.
+
+---+ Installation on a single host
+
+An installation on a single host will run the cluster manager, the
+primitive scheduler agent and a node manager. Install Linux on this
+machine, add the KVM virtualization packages and prepare the networking
+on the host to connect virtual machines.
+
+---++ Sample fulfillment of prerequisites
+
+For example, once you have logged into the machine as root, do the
+following to create the bridge your newly created virtual machines will
+connect to. You should be connected via console because you may lose
+your network connection if you aren't very careful here. Refer to your
+distribution's instructions on how to make this configuration permanent.
+
+# BEGIN SAMPLE PREPARATION
+# create a network bridge for the default network
+brctl addbr br0
+
+# set the bridge's hello and forward delay to 1 second
+brctl setfd br0 1
+brctl sethello br0 1
+
+# disconnect eth0 from the network, attach it to the bridge and
+# obtain an address for the bridge
+ifdown eth0;ifconfig eth0 0.0.0.0;brctl addif br0 eth0;dhclient br0
+
+# create a script /etc/qemu-ifup.0 which will allow virtual machines
+# to be attached to the default network (0).
+
+cat /etc/qemu-ifup.0
+#!/bin/sh
+
+/sbin/ifconfig $1 0.0.0.0 up
+/sbin/brctl addif br0 $1
+exit 0
+
+# Ensure the script has execute permissions
+chmod 700 /etc/qemu-ifup.0
+
+# END SAMPLE PREPARATION
+
+If you don't have RPyC version 3.1 installed, download a copy from
+http://sourceforge.net/projects/rpyc/files/main/ and install it.
+
+Prepare a virtual machine image in qcow2 format for Tashi to deploy. You
+can create this with practically any consumer or professional
+virtualization system, by converting the resulting disk image via
+qemu-img. Note that operating systems from Redmond only tend to install
+a minimal amount of hardware drivers, and deployment could fail because
+the necessary drivers aren't on the disk image. Search online for a
+virtual driver diskette providing virtio drivers for qemu, or other
+drivers for the virtualization layer you select. Linux and BSD VMs
+should be fine. For this installation, the default Tashi configuration
+will look for images in /tmp/images
+
+---++ Installation of Tashi code
+
+If you are reading this, you will already have obtained a distribution
+of the code. Go to the top level directory of Tashi and create a
+destination directory for the code base:
+
+ls
+DISCLAIMER doc/ etc/ LICENSE Makefile NOTICE README src/
+
+mkdir /usr/local/tashi
+
+Move the Tashi code to the directory you created
+mv * /usr/local/tashi
+
+Create Tashi executables in /usr/local/tashi/bin
+cd /usr/local/tashi
+root@grml:/usr/local/tashi# make
+Symlinking in clustermanager...
+Symlinking in nodemanager...
+Symlinking in tashi-client...
+Symlinking in primitive...
+Symlinking in zoni-cli...
+Symlinking in Accounting server...
+Done
+
+If /usr/local/tashi/src is not included in the system's default path for
+searching for python modules, ensure the environment variable PYTHONPATH
+is set before using any Tashi executables.
+
+export PYTHONPATH=/usr/local/tashi/src
+
+Start the cluster manager and populate the hosts and networks databases.
+When defining the host, you must provide the name the host has been
+given by the hostname command. If you plan on eventually having several
+hosts and networks, feel free to add them now.
+
+root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
+2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
+2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
+**********************************************************************
+Welcome to IPython. I will try to create a personal configuration directory
+where you can customize many aspects of IPython's functionality in:
+
+/root/.ipython
+Initializing from configuration: /usr/lib/python2.6/dist-packages/IPython/UserConfig
+
+Successful installation!
+
+Please read the sections 'Initial Configuration' and 'Quick Tips' in the
+IPython manual (there are both HTML and PDF versions supplied with the
+distribution) to make sure that your system environment is properly configured
+to take advantage of IPython's features.
+
+Important note: the configuration system has changed! The old system is
+still in place, but its setting may be partly overridden by the settings in
+"~/.ipython/ipy_user_conf.py" config file. Please take a look at the file
+if some of the new settings bother you.
+
+
+Please press <RETURN> to start IPython.
+**********************************************************************
+
+In [1]: from tashi.rpycservices.rpyctypes import Host, HostState, Network
+
+In [2]: data.baseDataObject.hosts[1] = Host(d={'id':1,'name':'grml','state': HostState.Normal,'up':False})
+
+In [3]: data.baseDataObject.networks[1]=Network(d={'id':0,'name':'default'})
+
+In [4]: data.baseDataObject.save()
+
+In [5]: import os
+
+In [6]: os.kill(os.getpid(), 9)
+
+Run the cluster manager in the background:
+root@grml:/usr/local/tashi/bin# ./clustermanager &
+[1] 4289
+root@grml:/usr/local/tashi/bin# 2012-01-25 07:53:43,177 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
+2012-01-25 07:53:43,177 [./clustermanager:INFO] Starting cluster manager
+
+Run the node manager in the background. Note that the hostname must be
+registered with the cluster manager, as shown above.
+
+root@grml:/usr/local/tashi/bin# ./nodemanager &
+[2] 4293
+root@grml:/usr/local/tashi/bin# 2012-01-25 07:53:59,348 [__main__:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg', '/usr/local/tashi/etc/NodeManager.cfg']
+2012-01-25 07:53:59,392 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.py:INFO] No VM information found in /var/tmp/VmControlQemu/
+2012-01-25 07:53:59,404 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.py:INFO] Waiting for NM initialization
+
+Verify that the node is shown as being "Up".
+
+root@grml:/usr/local/tashi/bin# ./tashi-client gethosts
+ id reserved name decayed up state version memory cores notes
+----------------------------------------------------------------
+ 1 [] grml True True Normal HEAD 233 1 None
+
+Start the primitive scheduling agent:
+root@grml:/usr/local/tashi/bin# ./primitive &
+[3] 4312
+
+Verify that the cluster manager has full communication with the host.
+When this has happened, decayed is False.
+
+root@grml:/usr/local/tashi/bin# tashi-client gethosts
+ id reserved name decayed up state version memory cores notes
+----------------------------------------------------------------
+ 1 [] grml False True Normal HEAD 233 1 None
+
+Check the presence of a disk image:
+
+root@grml:/usr/local/tashi/bin# ls /tmp/images/
+debian-wheezy-amd64.qcow2
+root@grml:/usr/local/tashi/bin# ./tashi-client getimages
+ id imageName imageSize
+---------------------------------------
+ 0 debian-wheezy-amd64.qcow2 1.74G
+
+Create a VM with 1 core and 128 MB of memory using our disk image in
+non-persistent mode:
+
+root@grml:/usr/local/tashi/bin# ./tashi-client createVm --cores 1
+--memory 128 --name wheezy --disks debian-wheezy-amd64.qcow2
+
+ id hostId name user state disk memory cores
+---------------------------------------------------------------------
+ 1 None wheezy root Pending debian-wheezy-amd64.qcow2 128 1
+
+2012-02-02 22:09:58,392 [./primitive:INFO] Scheduling instance wheezy (128 mem, 1 cores, 0 uid) on host grml
+2012-02-02 22:09:58,398 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.pyc:INFO] Executing command: /usr/bin/kvm -clock dynticks -drive file=/tmp/images/debian-wheezy-amd64.qcow2,if=ide,index=0,cache=off,snapshot=on -net nic,macaddr=52:54:00:90:2a:9d,model=virtio,vlan=0 -net tap,ifname=tashi1.0,vlan=0,script=/etc/qemu-ifup.0 -m 128 -smp 1 -serial null -vnc none -monitor pty
+2012-02-02 22:09:58,412 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.pyc:INFO] Adding vmId 5472
+
+Verify the machine is running:
+
+root@grml:/usr/local/tashi/bin# ./tashi-client getinstances
+ id hostId name user state disk memory cores
+---------------------------------------------------------------------
+ 1 1 wheezy root Running debian-wheezy-amd64.qcow2 128 1
+
+After the machine had a chance to boot, find out what address it got. If
+you have a DHCP server on your network, search the pool of addresses:
+
+root@grml:/usr/local/tashi/bin# ifconfig br0
+br0 Link encap:Ethernet HWaddr 00:0c:29:62:b3:76
+ inet addr:192.168.244.131 Bcast:192.168.244.255 Mask:255.255.255.0
+ inet6 addr: fe80::20c:29ff:fe62:b376/64 Scope:Link
+ UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
+ RX packets:2622 errors:0 dropped:0 overruns:0 frame:0
+ TX packets:1598 errors:0 dropped:0 overruns:0 carrier:0
+ collisions:0 txqueuelen:0
+ RX bytes:730925 (713.7 KiB) TX bytes:226530 (221.2 KiB)
+
+Find the MAC address given to the VM
+
+root@grml:/usr/local/tashi/bin# ./tashi-client getmyinstances
+--show-nics --hide-disk
+ id hostId name user state memory cores nics
+----------------------------------------------------------------------------------------------------
+ 1 1 wheezy root Running 128 1 [{'ip': None, 'mac': '52:54:00:90:2a:9d', 'network': 0}]
+
+Look for that MAC address on the local network
+root@grml:/usr/local/tashi/bin# arp-scan -I br0 192.168.244.0/24|grep 90:2a:9d
+Interface: br0, datalink type: EN10MB (Ethernet)
+Starting arp-scan 1.6 with 256 hosts (http://www.nta-monitor.com/tools/arp-scan/)
+192.168.244.136 52:54:00:90:2a:9d QEMU
+
+Log into the VM:
+root@grml:/usr/local/tashi/bin# ssh root@192.168.244.136
+The authenticity of host '192.168.244.136 (192.168.244.136)' can't be established.
+RSA key fingerprint is af:f2:1a:3a:2b:7c:c3:3b:6a:04:4f:37:bb:75:16:58.
+Are you sure you want to continue connecting (yes/no)? yes
+Warning: Permanently added '192.168.244.136' (RSA) to the list of known hosts.
+root@192.168.244.136's password:
+Linux debian 3.1.0-1-amd64 #1 SMP Tue Jan 10 05:01:58 UTC 2012 x86_64
+
+The programs included with the Debian GNU/Linux system are free software;
+the exact distribution terms for each program are described in the
+individual files in /usr/share/doc/*/copyright.
+
+Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
+permitted by applicable law.
+Last login: Thu Jan 19 15:06:22 2012 from login.cirrus.pdl.cmu.local
+debian:~#
+debian:~# uname -a
+Linux debian 3.1.0-1-amd64 #1 SMP Tue Jan 10 05:01:58 UTC 2012 x86_64 GNU/Linux
+debian:~# cat /proc/cpuinfo
+processor : 0
+vendor_id : AuthenticAMD
+cpu family : 6
+model : 2
+model name : QEMU Virtual CPU version 0.14.0
+stepping : 3
+cpu MHz : 2193.593
+cache size : 512 KB
+fpu : yes
+fpu_exception : yes
+cpuid level : 4
+wp : yes
+flags : fpu pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 syscall nx lm up nopl pni cx16 popcnt hypervisor lahf_lm svm abm sse4a
+bogomips : 4387.18
+TLB size : 1024 4K pages
+clflush size : 64
+cache_alignment : 64
+address sizes : 40 bits physical, 48 bits virtual
+power management:
+
+debian:~# perl -e 'print "My new VM!\n";'
+My new VM!
+debian:~# halt
+
+Broadcast message from root@debian (pts/0) (Wed Jan 25 02:01:43 2012):
+
+The system is going down for system halt NOW!
+debian:~# Connection to 192.168.244.136 closed by remote host.
+Connection to 192.168.244.136 closed.
+
+2012-02-02 22:18:35,662 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.pyc:INFO] Removing vmId 5472 because it is no longer running
+2012-02-02 22:18:35,662 [/usr/local/tashi/src/tashi/nodemanager/vmcontrol/qemu.pyc:INFO] Removing any scratch for wheezy
+2012-02-02 22:18:36,461 [./primitive:INFO] VM exited: wheezy
+
+Verify the VM is no longer running:
+
+root@grml:/usr/local/tashi/bin# ./tashi-client getinstances
+ id hostId name user state disk memory cores
+--------------------------------------------
+
+You have now completed the simplest form of a Tashi install: a single
+machine providing hosting, scheduling and management services. For
+additional information on what you can do, please view the documentation
+in the doc/ directory.
diff --git a/Makefile b/Makefile
index b790711..174defd 100644
--- a/Makefile
+++ b/Makefile
@@ -27,6 +27,15 @@
all: bin src/utils/nmd src/tags doc/html aws
@echo Done
+package: src DISCLAIMER INSTALL LICENSE NOTICE README
+ @echo "Building package in apache-tashi.tar.gz"
+ rm -rf apache-tashi.tar.gz apache-tashi
+ mkdir apache-tashi
+ cp -rp doc etc Makefile src DISCLAIMER INSTALL LICENSE NOTICE README apache-tashi/
+ find apache-tashi -type d -name ".svn"|xargs rm -rf
+ tar zcf apache-tashi.tar.gz apache-tashi
+ rm -rf apache-tashi
+
doc: rmdoc doc/html
@echo Done
@@ -35,7 +44,7 @@
@echo Done
version:
- sed -i "s/version = .*/version = \"`date`\"/" src/tashi/version.py
+ sed -i "s/version = .*/version = \"`date +%Y-%m-%d`\"/" src/tashi/version.py
aws: src/tashi/aws/wsdl/AmazonEC2_services_types.py src/tashi/aws/wsdl/AmazonEC2_services_server.py
@@ -51,49 +60,48 @@
rmaws:
if test -e src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; then echo Removing aws...; rm -f src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; rm -f src/tashi/aws/wsdl/AmazonEC2_*.py; fi
-# Implicit builds
-# src/utils/nmd: src/utils/Makefile src/utils/nmd.c
-# @echo Building nmd...
-# (cd src/utils; make)
-# ln -s ../src/utils/nmd ./bin/nmd
-
src/utils/nmd: src/utils/nmd.py
- ln -s ../src/utils/nmd.py ./bin/nmd.py
+ ln -s ../src/utils/nmd.py ./bin/nmd
#rmnmd:
# if test -e src/utils/nmd; then echo Removing nmd...; (cd src/utils; make clean); rm -f bin/nmd; fi
rmnmd:
- echo Removing nmd...; rm -f bin/nmd.py
+ echo Removing nmd...; rm -f bin/nmd
-bin: bindir bin/clustermanager.py bin/nodemanager.py bin/tashi-client.py bin/primitive.py bin/zoni-cli.py
+bin: bindir bin/clustermanager bin/nodemanager bin/tashi-client bin/primitive bin/zoni-cli bin/accounting
bindir:
if test ! -d bin; then mkdir bin; fi
-rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli
+rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli rmaccounting
if test -d bin; then rmdir bin; fi
bin/getInstances:
if test ! -e bin/getInstances; then (echo "Generating client symlinks..."; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --makesyms); fi
rmclients:
if test -e bin/getInstances; then (echo Removing client symlinks...; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --rmsyms; cd ..); fi
-bin/clustermanager.py: src/tashi/clustermanager/clustermanager.py
+bin/accounting: src/tashi/accounting/accounting.py
+ @echo Symlinking in Accounting server...
+ (cd bin; ln -s ../src/tashi/accounting/accounting.py accounting)
+rmaccounting:
+ if test -e bin/accounting; then echo Removing Accounting server symlink...; rm bin/accounting; fi
+bin/clustermanager: src/tashi/clustermanager/clustermanager.py
@echo Symlinking in clustermanager...
- (cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py .)
+ (cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py clustermanager)
rmclustermanager:
- if test -e bin/clustermanager.py; then echo Removing clustermanager symlink...; rm bin/clustermanager.py; fi
-bin/nodemanager.py: src/tashi/nodemanager/nodemanager.py
+ if test -e bin/clustermanager; then echo Removing clustermanager symlink...; rm bin/clustermanager; fi
+bin/nodemanager: src/tashi/nodemanager/nodemanager.py
@echo Symlinking in nodemanager...
- (cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py .)
+ (cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py nodemanager)
rmnodemanager:
- if test -e bin/nodemanager.py; then echo Removing nodemanager symlink...; rm bin/nodemanager.py; fi
-bin/primitive.py: src/tashi/agents/primitive.py
+ if test -e bin/nodemanager; then echo Removing nodemanager symlink...; rm bin/nodemanager; fi
+bin/primitive: src/tashi/agents/primitive.py
@echo Symlinking in primitive...
- (cd bin; ln -s ../src/tashi/agents/primitive.py .)
+ (cd bin; ln -s ../src/tashi/agents/primitive.py primitive)
rmprimitive:
- if test -e bin/primitive.py; then echo Removing primitve-agent symlink...; rm bin/primitive.py; fi
-bin/tashi-client.py:
+ if test -e bin/primitive; then echo Removing primitve-agent symlink...; rm bin/primitive; fi
+bin/tashi-client:
@echo Symlinking in tashi-client...
- (cd bin; ln -s ../src/tashi/client/tashi-client.py .)
+ (cd bin; ln -s ../src/tashi/client/tashi-client.py tashi-client)
rmtashi-client:
- if test -e bin/tashi-client.py; then echo Removing tashi-client symlink...; rm bin/tashi-client.py; fi
+ if test -e bin/tashi-client; then echo Removing tashi-client symlink...; rm bin/tashi-client; fi
src/tags:
@echo Generating tags...
(cd src; ctags-exuberant -R --c++-kinds=+p --fields=+iaS --extra=+q -f ./tags .)
@@ -107,14 +115,15 @@
if test -d doc/html; then echo Removing HTML docs...; rm -rf ./doc/html; fi
# Zoni
-bin/zoni-cli.py:
+bin/zoni-cli:
@echo Symlinking in zoni-cli...
- (cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+ (cd bin; ln -s ../src/zoni/client/zoni-cli .)
+# why necessarily put this in /usr/local/bin like nothing else?
usr/local/bin/zoni:
@echo Creating /usr/local/bin/zoni
(echo '#!/bin/bash\nPYTHONPATH=$(shell pwd)/src $(shell pwd)/bin/zoni-cli.py $$*' > /usr/local/bin/zoni; chmod 755 /usr/local/bin/zoni)
rmzoni-cli:
- if test -e bin/zoni-cli.py; then echo Removing zoni-cli symlink...; rm bin/zoni-cli.py; fi
+ if test -e bin/zoni-cli; then echo Removing zoni-cli symlink...; rm bin/zoni-cli; fi
if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
diff --git a/NOTICE b/NOTICE
index bf1c1c8..f444b9a 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache Tashi
-Copyright 2008-2011 The Apache Software Foundation
+Copyright 2008-2012 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/doc/DEVELOPMENT b/doc/DEVELOPMENT
index 8729da0..32d1b20 100644
--- a/doc/DEVELOPMENT
+++ b/doc/DEVELOPMENT
@@ -1,16 +1,10 @@
Current goals:
- * integrate bug fixes from Tashi sites into SVN.
- * Tashi is working well for us developers, so we should fork
- a stable version as beta release. Xen startup was problematic
- as of September, so advise on caveats there. Probably not include
- Zoni for now.
+ * Add more hardware support for Zoni.
+ * Add virtual hardware to Zoni to allow users to
+ add support for their own hardware?
Future goals:
- * How should Tashi accounting be done? Database entries or
- flat files?
- * Support host auto-registration, but only by affirmative option.
* Consider using libraries like libcloud to abstract VMM interactions.
- * Add more hardware support for Zoni.
Other ideas:
* Make available a console aggregator for user's VMs.
diff --git a/doc/INSTALL2 b/doc/INSTALL2
new file mode 100644
index 0000000..66ad7dc
--- /dev/null
+++ b/doc/INSTALL2
@@ -0,0 +1,57 @@
+---+ Installation on additional hosts
+
+To add additional hosts to the Apache Tashi cluster, they should be
+installed to satisfy the prerequisites as described in the main
+installation document. However, only the node manager will have to be
+launched on these hosts. There should be only one cluster manager and
+scheduling agent in the cluster.
+
+You must register the host's hostname with the clustermanager, as shown
+in the main installation document.
+
+You can then start the node manager manually, or have it start from the
+system initialization scripts.
+
+---+ Deployment over multiple networks
+
+To add additional networks to the Apache Tashi cluster, they should be
+brought to the hosts as VLAN network interfaces, attached to software
+bridges. The new network will have to be registered with the cluster
+manager, as detailed in the main installation document. Scheduling of
+virtual machines is open over the cluster, so each host needs to provide
+access to the same networks.
+
+Generally, your network switch will have to be configured to send
+packets "tagged" with the VLAN identifiers for all networks that the
+cluster is to host virtual machines on.
+
+This can be automatically done on start up. For example, a stanza from
+/etc/network/interfaces that configures a bridge for VLAN 11, using
+jumbo frames, will look like this:
+
+auto br11
+iface br11 inet manual
+ mtu 9000
+ bridge_ports eth0.11
+ bridge_fd 1
+ bridge_hello 1
+ up ifconfig eth0.11 mtu 9000
+
+The corresponding /etc/qemu-ifup.11 looks like this:
+
+#!/bin/sh
+
+/sbin/ifconfig $1 0.0.0.0 up mtu 9000
+/sbin/brctl addif br11 $1
+exit 0
+
+Note that the entire path of a network connection must be configured to
+use jumbo frames, if the virtual machines are to use them.
+
+---+ Accounting server
+
+An accounting server is available in the distribution. It will log
+events from the cluster manager and node managers, as well as obtain
+periodic state from the cluster manager on what virtual machines are
+running. It can be started by running "accounting" from the binaries
+directory, and then starting the cluster services.
diff --git a/doc/RELEASES b/doc/RELEASES
new file mode 100644
index 0000000..978834d
--- /dev/null
+++ b/doc/RELEASES
@@ -0,0 +1,140 @@
+---+ Apache Tashi release policy
+
+This document is based on the draft incubator release management
+document available at
+http://incubator.apache.org/guides/releasemanagement.html
+
+---++ Apache RAT testing
+
+You could use Apache RAT to examine for any problems like required files
+which are missing, or which ones don't include an Apache header.
+
+To run RAT, obtain the binary release of Apache RAT, untar it and run
+java -jar apache-rat-0.8/apache-rat-0.8.jar <top directory>
+
+---++ Uploading artifacts
+
+---+++ Distribution
+
+The destination for an artifact is www.apache.org/dist/incubator/tashi .
+This location is accessible at /www/www.apache.org/dist/incubator/tashi
+on people.apache.org.
+
+---+++ Mirroring
+
+Releases must be mirrored. Checksums, KEYS, signatures are mirrored too.
+Per the release signing guide, the mirrored copies of these files are
+non-authoritative. In-site links must point to the originals on
+www.apache.org.
+
+---+++ Archiving
+
+All releases must be archived for future reference. Archives are stored
+under http://archive.apache.org/dist/incubator/tashi. This archiving
+happens automatially for all artifacts in
+www.apache.org/dist/incubator/tashi.
+
+Old releases should be removed from www.apache.org/dist/incubator/tashi.
+The release will then only remain on the archive server.
+
+---+++ Permissions
+
+All files should be owned by the "incubator" group. Permissions should
+be <user>:incubator 664, or <user>:incubator 775 respectively for
+directories.
+
+---+++ Checksums and signatures
+
+The KEYS file should contain the public keys for code signers. The
+private key needs to be kept safe. Code-signing keys should be linked to
+the Apache web of trust.
+
+---+++ Modifications
+
+Uploaded artifacts must never be modified. Excluded from this policy are
+README's, NOTES, KEYS and the like.
+
+---++ Distribution checklist
+
+ * Destination: www.apache.org/dist/incubator/tashi
+ * umask 022
+ * group owner is "incubator"
+ * Checksums and signatures match to artifact
+ * Old release archived
+ * Links to KEYS, signatures and checksums point to www.apache.org
+ * Package into .tar.gz
+
+---++ Dealing with defects
+
+Uploaded artifacts must never be modified (repeat). A new numbered
+release should be generated. (Assume this will require a vote)
+
+Serious defects may call for a release withdrawal by archiving,
+announcement on the mailing lists and adding a notice to the download
+page.
+
+---++ Release checklist
+
+ * Packages:
+ * Artifacts unpack correctly?
+ * Documentation is readable?
+ * For distributed libraries:
+ * Licenses are included, along with NOTICEs?
+ * Licenses comply with incubator policy?
+ * LICENSE and NOTICE contain required sections?
+ * Crypto code satisfies export regulations?
+ * Copyright notices:
+ * Source files include license boilerplate?
+ * Source files with licenses not in LICENSE?
+ * Check policy on header file compliance?
+ * Incubator requirements:
+ * DISCLAIMER included?
+ * Check legal STATUS document?
+ * Source package:
+ * Build instructions exist and work?
+ * License headers applied correctly?
+ * Version control cruft cleaned?
+ * Keys:
+ * KEYS file contains proper signing keys?
+ * Signing key is available on public keyservers?
+ * Version control:
+ * Release was built from specific tag?
+ * Tag named "APACHE_TASHI_<version>"?
+ * Miscellaneous:
+ * Deprecations, incompatibilities documented in RELEASE_NOTES?
+
+---++ Naming
+
+The release should be named apache-tashi-<version>-incubating. <version>
+should be a six-digit year-and-month, optionally followed by a period
+and number in case of bug fixes applied to the base version.
+
+---++ Guidelines
+
+In case of need for additional guidance, the following mailing lists are
+helpful:
+
+ * legal-discuss: for licensing issues
+ * infrastructure-issues: for issues pertaining to release infrastructure
+
+---++ Announcements
+
+Announce releases via apache.org address. Announcements should be plain
+text signed.
+
+---++ Release procedure
+
+Releases must be approved by Incubator PMC.
+
+A release should first be proposed to the PPMC. The proposal should include:-
+ * Link to release candidate artifact (in apache home directory)
+ * Link to tag from which the release was built
+
+Should the PPMC vote pass, call for a vote on incubator.apache.org.
+Additionally to the above provide a link to the PPMC voting thread.
+
+Remove release candidate artifacts on conclusion of voting.
+
+---++ Invocation
+
+Oh god...
diff --git a/etc/Accounting.cfg b/etc/Accounting.cfg
new file mode 100644
index 0000000..1b7d603
--- /dev/null
+++ b/etc/Accounting.cfg
@@ -0,0 +1,25 @@
+[Accounting]
+service = tashi.accounting.AccountingService
+
+[AccountingService]
+port = 2228
+pollSleep = 600
+
+[handler_fileHandler]
+class = FileHandler
+level = NOTSET
+formatter = standardFormatter
+args = ("/var/log/accounting.log",)
+
+[handlers]
+#keys = consoleHandler,publisherHandler,fileHandler
+keys = consoleHandler,fileHandler
+
+[loggers]
+keys = root
+
+[logger_root]
+level = DEBUG
+#handlers = consoleHandler,publisherHandler,fileHandler,syslogHandler
+handlers = fileHandler
+propagate = 1
diff --git a/etc/Agent.cfg b/etc/Agent.cfg
new file mode 100644
index 0000000..88762ea
--- /dev/null
+++ b/etc/Agent.cfg
@@ -0,0 +1,18 @@
+[handler_fileHandler]
+class = FileHandler
+level = NOTSET
+formatter = standardFormatter
+args = ("/var/log/agent.log",)
+
+[handlers]
+#keys = consoleHandler,publisherHandler,fileHandler
+keys = consoleHandler,fileHandler
+
+[loggers]
+keys = root
+
+[logger_root]
+level = DEBUG
+#handlers = consoleHandler,publisherHandler,fileHandler,syslogHandler
+handlers = consoleHandler,fileHandler
+propagate = 1
diff --git a/etc/NodeManager.cfg b/etc/NodeManager.cfg
index fc65920..a47bccf 100644
--- a/etc/NodeManager.cfg
+++ b/etc/NodeManager.cfg
@@ -58,7 +58,7 @@
propagate = 1
[Vfs]
-prefix = /dfs
+prefix = /tmp
[XenPV]
vmNamePrefix = tashi
@@ -75,10 +75,11 @@
port = 9883
registerHost = False
registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
clusterManagerHost = localhost ; Clustermanager hostname
clusterManagerPort = 9882
statsInterval = 0.0
+;accountingHost = clustermanager
+;accountingPort = 2228
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
diff --git a/etc/TashiDefaults.cfg b/etc/TashiDefaults.cfg
index 94a8c93..fd034eb 100644
--- a/etc/TashiDefaults.cfg
+++ b/etc/TashiDefaults.cfg
@@ -55,6 +55,8 @@
maxMemory = 8192
maxCores = 8
allowDuplicateNames = False
+;accountingHost = clustermanager
+;accountingPort = 2228
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
@@ -87,6 +89,10 @@
uri = mysql://root@clustermanager/tashi
password = changeme
+# Accounting portion
+[Accounting]
+publisher = tashi.messaging.GangliaPublisher
+
# NodeManger portion
[NodeManager]
dfs = tashi.dfs.Vfs
@@ -100,7 +106,6 @@
port = 9883
registerHost = False
registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
# Clustermanger hostname
clusterManagerHost = localhost
clusterManagerPort = 9882
@@ -108,7 +113,7 @@
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
-qemuBin = /usr/local/bin/qemu-system-x86_64
+qemuBin = /usr/bin/kvm
infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
@@ -130,7 +135,7 @@
defaultDiskType=qcow
[Vfs]
-prefix = /dfs
+prefix = /tmp
[LocalityService]
host = localityserverhostname
@@ -149,12 +154,12 @@
publisher = tashi.messaging.GangliaPublisher
[Primitive]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
scheduleDelay = 2.0
densePack = False
[MauiWiki]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
refreshTime = 5
authuser = changeme
authkey = 1111
diff --git a/scripts/create b/scripts/create
deleted file mode 100755
index 4127a98..0000000
--- a/scripts/create
+++ /dev/null
@@ -1,26 +0,0 @@
-#! /bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [[ $# -ne 4 ]]; then
- echo "create <name> <type> <image> <persistent>"
-else
- mac=`echo $((RANDOM%256)) $((RANDOM%256)) $((RANDOM%256)) | awk '{printf("%2.2x:%2.2x:%2.2x", $1, $2, $3)}'`
- echo ./bin/createVm "Instance(d={'name':'$1','type':$2,'disks':[DiskConfiguration(d={'uri':'$3','persistent':$4})],'nics':[NetworkConfiguration(d={'network':2,'mac':'52:54:00:$mac'})], 'hints':{'display':'True'}})"
- ./bin/createVm "Instance(d={'name':'$1','type':$2,'disks':[DiskConfiguration(d={'uri':'$3','persistent':$4})],'nics':[NetworkConfiguration(d={'network':2,'mac':'52:54:00:$mac'})], 'hints':{'display':'True'}})"
-fi
diff --git a/scripts/demo-yahoo-08-14-08 b/scripts/demo-yahoo-08-14-08
deleted file mode 100755
index 9519fde..0000000
--- a/scripts/demo-yahoo-08-14-08
+++ /dev/null
@@ -1,109 +0,0 @@
-#! /bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-export PYTHONPATH=`pwd`/src/
-
-NUM=10
-
-BIGNODE=172.16.250.254
-
-BASEID=0
-
-date
-
-echo "Creating small VMs..."
-for h in `seq -w 1 ${NUM}`; do
- echo "Creating VM #${h}..."
- INFO=`./create 1 $h 2>/dev/null | grep "[ \t]id:"`
- if [[ ${BASEID} -eq 0 ]]; then
- BASEID=`echo ${INFO} | sed 's/[^0-9]*\([0-9]*\).*/\1/'`
- fi
-done
-echo "Small VMs created"
-echo "Waiting for startup..."
-while [[ true ]]; do
- PASS="True"
- for h in `cat ~/hosts/vms.txt`; do
- HN=`ssh root@bd.${h} "hostname 2> /dev/null" 2> /dev/null | cut -c -2`
- if [[ "${HN}" != "vm" ]]; then
- PASS="False"
- fi
- done
- if [[ "${PASS}" == "True" ]]; then
- break
- fi
- sleep 1
-done
-echo "Waiting finished"
-echo "Starting work on small VMs..."
-for h in `cat ~/hosts/vms.txt`; do
- ssh root@bd.${h} "./run > /dev/null 2>&1" > /dev/null 2>&1 &
-done
-echo "Creating large VM..."
-./create 2 11 > /dev/null 2>&1
-echo "Done creating large VM"
-while [[ true ]]; do
- sleep 5
- COUNT=0
- for h in `cat ~/hosts/vms.txt`; do
- CNT=`ssh root@bd.${h} "ls /x/mryan3/cvm-out/*/*.txt 2> /dev/null | wc -l 2> /dev/null" 2> /dev/null`
- COUNT=$((COUNT+CNT))
- done
- echo "${COUNT}/64 work items completed..."
- if [[ ${COUNT} -eq 64 ]]; then
- break
- fi
-done
-echo "Work on small VMs completed"
-for i in `seq 1 ${NUM}`; do
- wait
-done
-echo "Collecting output from small VMs to the large VM..."
-ssh root@bd.${BIGNODE} ./gather > /dev/null 2>&1
-echo "Done collecting output"
-echo "Destroying small VMs..."
-for i in `seq 1 ${NUM}`; do
- ./bin/destroyVm $((i+BASEID-1)) > /dev/null 2>&1
-done
-echo "Done destroying small VMs"
-echo "Doing work on large VM..."
-ssh root@bd.${BIGNODE} ./build > /dev/null 2>&1 &
-while [[ true ]]; do
- sleep 2
- SIZE=`ssh root@bd.${BIGNODE} "du -hs ./out.e 2> /dev/null | awk '{print "'$1'"}' 2> /dev/null" 2> /dev/null`
- echo "${SIZE}/154M output data generated..."
- if [[ "${SIZE}" == "154M" ]]; then
- break
- fi
-done
-wait
-echo "Work on large VM complete"
-echo "Copying final output file to localhost..."
-scp root@bd.${BIGNODE}:./out.e /tmp/out.e
-echo "Copy complete"
-echo "Destroying large VM..."
-./bin/destroyVm $((11+BASEID-1)) > /dev/null 2>&1
-echo "Large VM destroyed"
-echo "Generating output image from etree..."
-(cd ~/local/src/mryan3/BigDatavis/src; ./util/draw_slice -d /tmp/out.e 0 0 0 512 0 0 0 512 0 512 512 /tmp/output.jpg > /dev/null 2>&1)
-echo "Image complete"
-
-date
-
-qiv /tmp/output.jpg
diff --git a/scripts/mryan3-database-setup b/scripts/mryan3-database-setup
deleted file mode 100755
index 14324f8..0000000
--- a/scripts/mryan3-database-setup
+++ /dev/null
@@ -1,28 +0,0 @@
-#! /bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-./bin/addUser "User(d={'username':'mryan3'})"
-./bin/addPersistentImage "PersistentImage(d={'userId':1,'name':'i386-ubuntu.qcow'})"
-./bin/addInstanceConfiguration "InstanceConfiguration(d={'name':'i386-512','memory':512,'cores':1})"
-./bin/addHardDiskConfiguration "HardDiskConfiguration(d={'index':0,'persistentImageId':1,'persistent':False,'instanceConfigurationId':1})"
-./bin/addNetworkInterfaceConfiguration "NetworkInterfaceConfiguration(d={'index':0,'instanceConfigurationId':1})"
-./bin/addHost "Host(d={'hostname':'blade043'})"
-./bin/addHost "Host(d={'hostname':'blade044'})"
-./bin/addHost "Host(d={'hostname':'blade045'})"
-./bin/addHost "Host(d={'hostname':'blade074'})"
diff --git a/scripts/stress b/scripts/stress
deleted file mode 100755
index 623f52c..0000000
--- a/scripts/stress
+++ /dev/null
@@ -1,109 +0,0 @@
-#! /bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-if [[ $# -eq 1 ]]; then
- VMS=$1
-else
- VMS=20
-fi
-
-date
-
-#HOSTS=`./bin/getHosts | grep name | wc -l`
-HOSTS="4"
-
-echo "Hosts: ${HOSTS}"
-
-echo "Create:"
-
-MID=10240000
-for i in `seq -w 1 $VMS`; do
- echo "./scripts/create 2 $i"
- ID=`./scripts/create 2 $i | grep "id: " | awk '{print $2}'`
- if [[ ${ID} -lt ${MID} ]]; then
- MID=${ID}
- fi
-done
-
-date
-
-echo "Wait:"
-
-while [[ true ]]; do
- CNT=`./bin/getInstances | grep -c Running`
- echo ${CNT}
- if [[ ${CNT} -eq ${VMS} ]]; then
- break
- fi
- sleep 1
-done
-
-sleep 20
-
-date
-
-echo "Migrate:"
-
-for i in `seq 0 $((VMS-1))`; do
- echo "./bin/migrateVm $((MID+i)) $((((i+1)%HOSTS)+1))"
- ./bin/migrateVm $((MID+i)) $((((i+1)%HOSTS)+1)) > /dev/null &
-done
-for i in `seq 0 $((VMS-1))`; do
- wait
-done
-
-date
-
-echo "Wait:"
-
-while [[ true ]]; do
- CNT=`./bin/getInstances | grep -c Running`
- echo ${CNT}
- if [[ ${CNT} -eq ${VMS} ]]; then
- break
- fi
- sleep 1
-done
-
-date
-
-echo "Destroy:"
-
-for i in `seq 0 $((VMS-1))`; do
- echo "./bin/destroyVm $((MID+i))"
- ./bin/destroyVm $((MID+i)) > /dev/null 2>&1 &
-done
-for i in `seq 0 $((VMS-1))`; do
- wait
-done
-
-date
-
-echo "Wait:"
-
-while [[ true ]]; do
- CNT=`./bin/getInstances | wc -l`
- echo ${CNT}
- if [[ ${CNT} -eq 1 ]]; then
- break
- fi
- sleep 1
-done
-
-date
diff --git a/scripts/tomer-database-setup b/scripts/tomer-database-setup
deleted file mode 100755
index 7fd718b..0000000
--- a/scripts/tomer-database-setup
+++ /dev/null
@@ -1,29 +0,0 @@
-#! /bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-./bin/addUser "User(d={'username':'administrator'})"
-./bin/addUser "User(d={'username':'tshiran'})"
-./bin/addUser "User(d={'username':'jcipar'})"
-./bin/addPersistentImage "PersistentImage(d={'userId':1,'name':'hardy-25G.img'})"
-./bin/addPersistentImage "PersistentImage(d={'userId':1,'name':'hardy-25G-pdl.img','parentId':1})"
-./bin/addPersistentImage "PersistentImage(d={'userId':2,'name':'hardy-25G-tshiran.img','parentId':2})"
-./bin/addPersistentImage "PersistentImage(d={'userId':3,'name':'hardy-25G-jcipar.img','parentId':2})"
-./bin/addPersistentImage "PersistentImage(d={'userId':2,'name':'hardy-25G-tshiran-hadoop.img','parentId':3})"
-./bin/addHost "Host(d={'hostname':'ss306'})"
-./bin/addHost "Host(d={'hostname':'ss308'})"
diff --git a/scripts/resume b/src/tashi/accounting/__init__.py
old mode 100755
new mode 100644
similarity index 74%
rename from scripts/resume
rename to src/tashi/accounting/__init__.py
index 54af567..2b198bb
--- a/scripts/resume
+++ b/src/tashi/accounting/__init__.py
@@ -1,5 +1,3 @@
-#! /bin/bash
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-./bin/resumeVm "Instance(d={'name':'foobar','type':$1,'disks':[DiskConfiguration(d={'uri':'i386-ubuntu.qcow','persistent':False})],'nics':[NetworkConfiguration(d={'network':2,'mac':'52:54:00:00:05:$2'})], 'hints':{'display':'True'}})" "\"$3\""
+from accountingservice import AccountingService
diff --git a/src/tashi/accounting/accounting.py b/src/tashi/accounting/accounting.py
new file mode 100755
index 0000000..93d2999
--- /dev/null
+++ b/src/tashi/accounting/accounting.py
@@ -0,0 +1,86 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import signal
+import logging.config
+
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
+
+#from tashi.rpycservices.rpyctypes import *
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+import tashi
+
+class Accounting(object):
+ def __init__(self, config, cmclient):
+ self.config = config
+ self.cm = cmclient
+ self.hooks = []
+ self.log = logging.getLogger(__file__)
+
+ items = self.config.items("Accounting")
+ items.sort()
+ for item in items:
+ (name, value) = item
+ name = name.lower()
+ if (name.startswith("hook")):
+ try:
+ self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ except:
+ self.log.exception("Failed to load hook %s" % (value))
+
+ def initAccountingServer(self):
+ service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+
+ #if boolean(self.config.get("Security", "authAndEncrypt")):
+ if False:
+ pass
+ else:
+ t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService', 'port')), auto_register=False)
+
+ t.logger.setLevel(logging.ERROR)
+ t.service.service = service
+ t.service._type = 'AccountingService'
+
+ debugConsole(globals())
+
+ try:
+ t.start()
+ except KeyboardInterrupt:
+ self.handleSIGTERM(signal.SIGTERM, None)
+
+ @signalHandler(signal.SIGTERM)
+ def handleSIGTERM(self, signalNumber, stackFrame):
+ self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+ sys.exit(0)
+
+def main():
+ (config, configFiles) = getConfig(["Accounting"])
+ publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
+ tashi.publisher = publisher
+ cmclient = createClient(config)
+ logging.config.fileConfig(configFiles)
+ accounting = Accounting(config, cmclient)
+
+ accounting.initAccountingServer()
+
+if __name__ == "__main__":
+ main()
diff --git a/src/tashi/accounting/accountingservice.py b/src/tashi/accounting/accountingservice.py
new file mode 100644
index 0000000..b1c035a
--- /dev/null
+++ b/src/tashi/accounting/accountingservice.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import threading
+import time
+
+from tashi import createClient
+
+class AccountingService(object):
+ """RPC service for the Accounting service"""
+
+ def __init__(self, config):
+ self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.INFO)
+
+ self.config = config
+
+ self.pollSleep = None
+
+ # XXXstroucki new python has fallback values
+ try:
+ self.pollSleep = self.config.getint("AccountingService", "pollSleep")
+ except:
+ pass
+
+ if self.pollSleep is None:
+ self.pollSleep = 600
+
+ self.cm = createClient(config)
+ threading.Thread(target=self.__start).start()
+
+ # remote
+ def record(self, strings):
+ for string in strings:
+ self.log.info("Remote: %s" % (string))
+
+ def __start(self):
+ while True:
+ try:
+ instances = self.cm.getInstances()
+ for instance in instances:
+ # XXXstroucki this currently duplicates what the CM was doing.
+ self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+ except:
+ self.log.warning("Accounting iteration failed")
+
+
+ # wait to do the next iteration
+ time.sleep(self.pollSleep)
diff --git a/src/tashi/agents/primitive.py b/src/tashi/agents/primitive.py
index f2d62b7..99ef702 100755
--- a/src/tashi/agents/primitive.py
+++ b/src/tashi/agents/primitive.py
@@ -17,16 +17,11 @@
# specific language governing permissions and limitations
# under the License.
-from socket import gethostname
-import os
-import socket
-import sys
-import threading
import time
-import random
import logging.config
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
+
from tashi.util import getConfig, createClient, instantiateImplementation, boolean
import tashi
@@ -274,10 +269,10 @@
# end for unassigned vms
- except TashiException, e:
+ except TashiException:
self.log.exception("Tashi exception")
- except Exception, e:
+ except Exception:
self.log.warning("Scheduler iteration failed")
diff --git a/src/tashi/client/tashi-client.py b/src/tashi/client/tashi-client.py
index 9060270..f36e719 100755
--- a/src/tashi/client/tashi-client.py
+++ b/src/tashi/client/tashi-client.py
@@ -211,6 +211,7 @@
'getSlots': (getSlots, None),
'getImages': (None, ['id', 'imageName', 'imageSize']),
'copyImage': (None, None),
+'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
@@ -575,7 +576,8 @@
if (type(res) == types.ListType):
makeTable(res, keys)
else:
- pprint(res)
+ makeTable([res], keys)
+
except IOError:
pass
except Exception, e:
diff --git a/src/tashi/clustermanager/clustermanager.py b/src/tashi/clustermanager/clustermanager.py
index 4cd8d0a..db61194 100755
--- a/src/tashi/clustermanager/clustermanager.py
+++ b/src/tashi/clustermanager/clustermanager.py
@@ -17,13 +17,9 @@
# specific language governing permissions and limitations
# under the License.
-import os
import sys
-import threading
import signal
import logging.config
-from getopt import getopt, GetoptError
-from ConfigParser import ConfigParser
from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
import tashi
diff --git a/src/tashi/clustermanager/clustermanagerservice.py b/src/tashi/clustermanager/clustermanagerservice.py
index 76fdc73..284ffcb 100644
--- a/src/tashi/clustermanager/clustermanagerservice.py
+++ b/src/tashi/clustermanager/clustermanagerservice.py
@@ -15,15 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime
-from random import randint
-from socket import gethostname
import logging
import threading
import time
+from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
+from tashi import boolean, ConnectionManager, vmStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
@@ -42,6 +40,7 @@
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.ERROR)
self.hostLastContactTime = {}
#self.hostLastUpdateTime = {}
self.instanceLastContactTime = {}
@@ -51,27 +50,96 @@
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
- now = self.__now()
+
+ self.accountingHost = None
+ self.accountingPort = None
+ try:
+ self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('ClusterManagerService', 'accountingPort')
+ except:
+ pass
+
+ self.__initAccounting()
+ self.__initCluster()
+
+ threading.Thread(target=self.__monitorCluster).start()
+
+ def __initAccounting(self):
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ except:
+ self.log.exception("Could not init accounting")
+
+ def __initCluster(self):
+ # initialize state of VMs if restarting
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
if instance.hostId is None:
- self.stateTransition(instance, None, InstanceState.Pending)
+ self.__stateTransition(instance, None, InstanceState.Pending)
else:
- self.stateTransition(instance, None, InstanceState.Orphaned)
+ self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
+
+ # initialize state of hosts if restarting
for host in self.data.getHosts().itervalues():
hostId = host.id
host = self.data.acquireHost(hostId)
host.up = False
host.decayed = False
self.data.releaseHost(host)
- threading.Thread(target=self.monitorCluster).start()
- def stateTransition(self, instance, old, cur):
+
+
+ def __ACCOUNTFLUSH(self):
+ try:
+ if (self.accountingClient is not None):
+ self.accountingClient.record(self.accountBuffer)
+ self.accountLines = 0
+ self.accountBuffer = []
+ except:
+ self.log.exception("Failed to flush accounting data")
+
+
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = self.__now()
+ instanceText = None
+ hostText = None
+
+ if instance is not None:
+ try:
+ instanceText = 'Instance(%s)' % (instance)
+ except:
+ self.log.exception("Invalid instance data")
+
+ if host is not None:
+ try:
+ hostText = "Host(%s)" % (host)
+ except:
+ self.log.exception("Invalid host data")
+
+ secondary = ','.join(filter(None, (hostText, instanceText)))
+
+ line = "%s|%s|%s" % (now, text, secondary)
+
+ self.accountBuffer.append(line)
+ self.accountLines += 1
+
+ # XXXstroucki think about autoflush by time
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
+
+
+
+ def __stateTransition(self, instance, old, cur):
if (old and instance.state != old):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
if (instance.state == cur):
@@ -104,7 +172,7 @@
instance = self.data.acquireInstance(instanceId)
if instance.hostId == host.id:
instance.decayed = True
- self.stateTransition(instance, None, InstanceState.Orphaned)
+ self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
@@ -148,10 +216,16 @@
for hostId in self.hostLastContactTime.keys():
#self.log.warning("iterate %d" % hostId)
host = self.data.acquireHost(hostId)
- if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+ # XXXstroucki: timing has changed with the message
+ # buffering in the NM, so this wasn't being run any-
+ # more because the time check was passing.
+ # I should think a bit more about this, but
+ # the "if True" is probably appropriate.
+ #if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+ if True:
host.decayed = True
- self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+ self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
@@ -195,6 +269,15 @@
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
+ # Don't query non-running VMs. eg. if a VM
+ # is suspended, and has no host, then there's
+ # no one to ask
+ if instance.state != InstanceState.Running and \
+ instance.state != InstanceState.Activating and \
+ instance.state != InstanceState.Orphaned:
+ continue
+
+ # XXXstroucki should lock instance here?
if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
@@ -202,7 +285,7 @@
continue
instance.decayed = True
- self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+ self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
if instance.hostId is None: raise AssertionError
# XXXstroucki check if host is down?
@@ -225,21 +308,6 @@
self.data.releaseInstance(instance)
-
- def monitorCluster(self):
- while True:
- sleepFor = min(self.expireHostTime, self.allowDecayed)
-
- try:
- self.__checkHosts()
- self.__checkInstances()
- except:
- self.log.exception('monitorCluster iteration failed')
- # XXXrgass too chatty. Remove
- #self.log.info("Sleeping for %d seconds" % sleepFor)
- time.sleep(sleepFor)
-
-
def normalize(self, instance):
instance.id = None
instance.vmId = None
@@ -274,12 +342,14 @@
instance = self.normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].shutdownVm(instance.vmId)
@@ -291,14 +361,17 @@
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
+ self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
- self.stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
+ self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
# clean state.
- self.stateTransition(instance, None, InstanceState.Destroying)
+ self.__ACCOUNT("CM VM DESTROY", instance=instance)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
if instance.hostId is None:
self.data.removeInstance(instance)
else:
@@ -316,8 +389,9 @@
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
destination = "suspend/%d_%s" % (instance.id, instance.name)
try:
@@ -329,14 +403,16 @@
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM RESUME", instance=instance)
return instance
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
+ self.__ACCOUNT("CM VM MIGRATE", instance=instance)
try:
# FIXME: should these be acquire/release host?
targetHost = self.data.getHost(targetHostId)
@@ -345,7 +421,7 @@
except:
self.data.releaseInstance(instance)
raise
- self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -353,16 +429,16 @@
self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
- except Exception, e:
+ except Exception:
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
self.data.releaseInstance(instance)
try:
# Send the VM
self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
- except Exception, e:
+ except Exception:
self.log.exception('migrateVm failed')
raise
try:
@@ -374,15 +450,16 @@
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
- except Exception, e:
+ except Exception:
self.log.exception('receiveVm failed')
raise
return
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].pauseVm(instance.vmId)
@@ -390,14 +467,15 @@
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
self.data.releaseInstance(instance)
return
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].unpauseVm(instance.vmId)
@@ -405,7 +483,7 @@
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
self.data.releaseInstance(instance)
return
@@ -440,6 +518,7 @@
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
+ self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
try:
res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
except Exception:
@@ -466,8 +545,7 @@
if oldHost.up == False:
self.__upHost(oldHost)
- self.hostLastContactTime[host.id] = time.time()
- #self.hostLastUpdateTime[host.id] = time.time()
+ self.hostLastContactTime[host.id] = self.__now()
oldHost.version = host.version
oldHost.memory = host.memory
oldHost.cores = host.cores
@@ -479,10 +557,8 @@
oldHost.state = HostState.Normal
# let the host communicate what it is running
- # XXXrgass - This is too chatty for the console, I think we should remove this.
- # XXXstroucki - My install depends on this, but I output to log files. This should be handled by a separate accounting server in future.
+ # and note that the information is not stale
for instance in instances:
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
@@ -500,8 +576,9 @@
self.log.exception("Could not acquire instance")
raise
- self.instanceLastContactTime[instanceId] = time.time()
+ self.instanceLastContactTime[instanceId] = self.__now()
oldInstance.decayed = False
+ self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
@@ -509,7 +586,7 @@
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
if (oldInstance.state == InstanceState.Suspending):
- self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
+ self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
self.data.releaseInstance(oldInstance)
@@ -552,13 +629,14 @@
self.data.releaseHost(dataHost)
instance = self.data.acquireInstance(instanceId)
+ self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
- #self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
- self.stateTransition(instance, None, InstanceState.Activating)
+ #self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+ self.__stateTransition(instance, None, InstanceState.Activating)
instance.hostId = host.id
self.data.releaseInstance(instance)
@@ -568,14 +646,14 @@
vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
else:
vmId = self.proxy[host.name].instantiateVm(instance)
- except Exception, e:
+ except Exception:
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
self.data.removeInstance(instance)
else:
# XXXstroucki what can we do about pending hosts in the scheduler?
# put them at the end of the queue and keep trying?
- self.stateTransition(instance, None, InstanceState.Held)
+ self.__stateTransition(instance, None, InstanceState.Held)
instance.hostId = None
self.data.releaseInstance(instance)
return "failure"
@@ -594,7 +672,7 @@
else:
if ('__resume_source' not in instance.hints):
# XXXstroucki should we just wait for NM to update?
- #self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+ #self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
pass
self.data.releaseInstance(instance)
@@ -606,9 +684,41 @@
self.log.info("Host %s is already registered, it was updated now" % hostname)
else:
self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+
+ try:
+ host = self.data.getHost(hostId)
+ self.__ACCOUNT("CM HOST REGISTER", host=host)
+ except:
+ self.log.warning("Failed to lookup host %s" % hostId)
+
return hostId
def unregisterHost(self, hostId):
+ try:
+ host = self.data.getHost(hostId)
+ self.__ACCOUNT("CM HOST UNREGISTER", host=host)
+ except:
+ self.log.warning("Failed to lookup host %s" % hostId)
+ return
+
self.data.unregisterHost(hostId)
self.log.info("Host %s was unregistered" % hostId)
return
+
+ # service thread
+ def __monitorCluster(self):
+ while True:
+ sleepFor = min(self.expireHostTime, self.allowDecayed)
+
+ try:
+ self.__checkHosts()
+ self.__checkInstances()
+ except:
+ self.log.exception('monitorCluster iteration failed')
+ # XXXrgass too chatty. Remove
+ # XXXstroucki the risk is that a deadlock in obtaining
+ # data could prevent this loop from continuing.
+ #self.log.info("Sleeping for %d seconds" % sleepFor)
+ time.sleep(sleepFor)
+
+
diff --git a/src/tashi/clustermanager/data/fromconfig.py b/src/tashi/clustermanager/data/fromconfig.py
index 189c342..8511a07 100644
--- a/src/tashi/clustermanager/data/fromconfig.py
+++ b/src/tashi/clustermanager/data/fromconfig.py
@@ -20,7 +20,7 @@
import os
import ConfigParser
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
@@ -109,7 +109,7 @@
def releaseInstance(self, instance):
try:
if (instance.id not in self.instances): # MPR: should never be true, but good to check
- raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instanceId)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
finally:
self.releaseLock(instance._lock)
diff --git a/src/tashi/clustermanager/data/pickled.py b/src/tashi/clustermanager/data/pickled.py
index a74fb3e..b3a6e03 100644
--- a/src/tashi/clustermanager/data/pickled.py
+++ b/src/tashi/clustermanager/data/pickled.py
@@ -18,7 +18,7 @@
import cPickle
import os
import threading
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host
from tashi.clustermanager.data import FromConfig, DataInterface
class Pickled(FromConfig):
diff --git a/src/tashi/clustermanager/data/sql.py b/src/tashi/clustermanager/data/sql.py
index 7a99226..64e5681 100644
--- a/src/tashi/clustermanager/data/sql.py
+++ b/src/tashi/clustermanager/data/sql.py
@@ -17,11 +17,9 @@
import logging
import threading
-import time
-import types
# XXXstroucki getImages needs os?
import os
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
from tashi.clustermanager.data.datainterface import DataInterface
from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
@@ -45,7 +43,7 @@
self.password = self.config.get('SQL', 'password')
self.conn = MySQLdb.connect(host=host, user=user, passwd=self.password, db=db)
else:
- raise ValueException, 'Unknown SQL database engine by URI: %s' % (self.uri)
+ raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
@@ -319,7 +317,7 @@
for r in res:
if r[1] == hostname:
id = r[0]
- print "Host %s already registered, update will be done" % id
+ self.log.warning("Host %s already registered, update will be done" % id)
s = ""
host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
diff --git a/src/tashi/connectionmanager.py b/src/tashi/connectionmanager.py
index b2bcb15..5eeae6c 100644
--- a/src/tashi/connectionmanager.py
+++ b/src/tashi/connectionmanager.py
@@ -15,9 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-import rpyc
from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import *
+#from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
def __init__(self, username, password, port, timeout=10000.0):
diff --git a/src/tashi/dfs/vfs.py b/src/tashi/dfs/vfs.py
index 383e363..d039335 100644
--- a/src/tashi/dfs/vfs.py
+++ b/src/tashi/dfs/vfs.py
@@ -27,52 +27,67 @@
DfsInterface.__init__(self, config)
self.prefix = self.config.get("Vfs", "prefix")
-# why do these three need to be separate?
+ def __dfsToReal(self, dfspath):
+ realpath = os.path.join(self.prefix, dfspath)
+ return realpath
+
def copyTo(self, localSrc, dst):
- shutil.copy(localSrc, os.path.join(self.prefix, dst))
-# just assuming this works
+ realdest = self.__dfsToReal(dst)
+ shutil.copy(localSrc, realdest)
+ # just assuming this works
return None
def copyFrom(self, src, localDst):
- shutil.copy(os.path.join(self.prefix, src), localDst)
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ shutil.copy(realsrc, localDst)
+ # just assuming this works
return None
def copy(self, src, dst):
- shutil.copy(os.path.join(self.prefix, src),
- os.path.join(self.prefix, dst))
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ realdst = self.__dfsToReal(dst)
+ shutil.copy(realsrc, realdst)
+ # just assuming this works
return None
def list(self, path):
try:
- return os.listdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.listdir(realpath)
except OSError, e:
+ # XXXstroucki error 20 = ENOTDIR
if (e.errno == 20):
return [path.split('/')[-1]]
else:
raise
def stat(self, path):
- return os.stat(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.stat(realpath)
def move(self, src, dst):
- shutil.move(os.path.join(self.prefix, src),
- os.path.join(self.prefix, dst))
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ realdst = self.__dfsToReal(dst)
+ shutil.move(realsrc, realdst)
+ # just assuming this works
return None
def mkdir(self, path):
- return os.mkdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.mkdir(realpath)
def unlink(self, path):
- return os.unlink(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.unlink(realpath)
def rmdir(self, path):
- return os.rmdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.rmdir(realpath)
def open(self, path, perm):
- return open(os.path.join(self.prefix, path), perm)
+ realpath = self.__dfsToReal(path)
+ return open(realpath, perm)
def getLocalHandle(self, path):
- return os.path.join(self.prefix, path)
+ realpath = self.__dfsToReal(path)
+ return realpath
diff --git a/src/tashi/nodemanager/nodemanager.py b/src/tashi/nodemanager/nodemanager.py
index 166262d..66d2d5b 100755
--- a/src/tashi/nodemanager/nodemanager.py
+++ b/src/tashi/nodemanager/nodemanager.py
@@ -22,7 +22,6 @@
import sys
from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi import ConnectionManager
import tashi
from tashi import boolean
diff --git a/src/tashi/nodemanager/nodemanagerservice.py b/src/tashi/nodemanager/nodemanagerservice.py
index 6ad86c7..c493ac9 100755
--- a/src/tashi/nodemanager/nodemanagerservice.py
+++ b/src/tashi/nodemanager/nodemanagerservice.py
@@ -15,17 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-import cPickle
import logging
-import os
import socket
-import sys
import threading
import time
-from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.nodemanager import RPC
-from tashi import boolean, vmStates, logged, ConnectionManager, timed
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import InstanceState, TashiException, Errors, Instance
+from tashi import boolean, vmStates, ConnectionManager
import tashi
@@ -50,257 +47,150 @@
self.log = logging.getLogger(__file__)
self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
- self.infoFile = self.config.get('NodeManagerService', 'infoFile')
self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
- self.id = None
+ self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+ try:
+ self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+ except:
+ self.log.exception("Could not connect to CM")
+ return
+
+ self.accountingHost = None
+ self.accountingPort = None
+ try:
+ self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
+ except:
+ pass
+
self.notifyCM = []
- self.loadVmInfo()
- vmList = self.vmm.listVms()
- for vmId in vmList:
- if (vmId not in self.instances):
- self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
- self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
- for vmId in self.instances.keys():
- if (vmId not in vmList):
- self.log.warning('vmcontrol backend does not report %d' % (vmId))
- self.vmStateChange(vmId, None, InstanceState.Exited)
- self.registerHost()
- threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
- threading.Thread(target=self.registerWithClusterManager).start()
- threading.Thread(target=self.statsThread).start()
+
+ self.__initAccounting()
+
+ self.id = None
+ # XXXstroucki this fn could be in this level maybe?
+ self.host = self.vmm.getHostInfo(self)
+
+ # populate self.instances
+ self.__loadVmInfo()
+
+ self.__registerHost()
+
+ self.id = self.cm.registerNodeManager(self.host, self.instances.values())
+
+ # XXXstroucki cut cross check for NM/VMM state
+
+ # start service threads
+ threading.Thread(target=self.__registerWithClusterManager).start()
+ threading.Thread(target=self.__statsThread).start()
- def loadVmInfo(self):
+ def __initAccounting(self):
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ except:
+ self.log.exception("Could not init accounting")
+
+ def __loadVmInfo(self):
try:
self.instances = self.vmm.getInstances()
- except Exception, e:
+ except Exception:
self.log.exception('Failed to obtain VM info')
self.instances = {}
-
- def saveVmInfo(self):
+
+ # send data to CM
+ # XXXstroucki adapt this for accounting?
+ def __flushNotifyCM(self):
+ start = time.time()
+ # send data to CM, adding message to buffer if
+ # it fails
try:
- data = cPickle.dumps(self.instances)
- f = open(self.infoFile, "w")
- f.write(data)
- f.close()
- except Exception, e:
- self.log.exception('Failed to save VM info to %s' % (self.infoFile))
-
- def vmStateChange(self, vmId, old, cur):
- instance = self.getInstance(vmId)
- if (old and instance.state != old):
- self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
- if (cur == InstanceState.Exited):
- del self.instances[vmId]
- return True
-
- if (instance.state == cur):
- # Don't do anything if state is what it should be
- return True
-
- instance.state = cur
- newInst = Instance(d={'state':cur})
- success = lambda: None
- try:
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cm.vmUpdate(instance.id, newInst, old)
- except Exception, e:
- self.log.exception('RPC failed for vmUpdate on CM')
- self.notifyCM.append((instance.id, newInst, old, success))
- else:
- success()
- return True
-
- def backupVmInfoAndFlushNotifyCM(self):
- cm = None
- cmUseCount = 0
- while True:
- if cmUseCount > 10 or cm is None:
- try:
- # XXXstroucki hope rpyc handles destruction
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cmUseCount = 0
- except Exception, e:
- self.log.warning("Could not get a handle to the clustermanager")
- time.sleep(60)
- continue
-
- cmUseCount = cmUseCount + 1
- start = time.time()
+ notifyCM = []
try:
- self.saveVmInfo()
- except Exception, e:
- self.log.exception('Failed to save VM info')
- try:
- notifyCM = []
- try:
- while (len(self.notifyCM) > 0):
- (instanceId, newInst, old, success) = self.notifyCM.pop(0)
- try:
- cm.vmUpdate(instanceId, newInst, old)
- except TashiException, e:
- notifyCM.append((instanceId, newInst, old, success))
- if (e.errno != Errors.IncorrectVmState):
- raise
- except:
- notifyCM.append((instanceId, newInst, old, success))
+ while (len(self.notifyCM) > 0):
+ value = self.notifyCM.pop(0)
+ (instanceId, newInst, old, success) = value
+ try:
+ self.cm.vmUpdate(instanceId, newInst, old)
+ except TashiException, e:
+ notifyCM.append((instanceId, newInst, old, success))
+ if (e.errno != Errors.IncorrectVmState):
raise
- else:
- success()
- finally:
- self.notifyCM = self.notifyCM + notifyCM
- except Exception, e:
- self.log.exception('Failed to register with the CM')
- toSleep = start - time.time() + self.registerFrequency
- if (toSleep > 0):
- time.sleep(toSleep)
-
- def registerWithClusterManager(self):
- cm = None
- cmUseCount = 0
+ except:
+ notifyCM.append((instanceId, newInst, old, success))
+ raise
+ else:
+ success()
+ finally:
+ if len(notifyCM) > 0:
+ self.notifyCM.append(notifyCM)
+ except Exception, e:
+ self.log.exception('Failed to send data to the CM')
+
+ #toSleep = start - time.time() + self.registerFrequency
+ #if (toSleep > 0):
+ #time.sleep(toSleep)
+
+ def __ACCOUNTFLUSH(self):
+ try:
+ if (self.accountingClient is not None):
+ self.accountingClient.record(self.accountBuffer)
+ self.accountLines = 0
+ self.accountBuffer = []
+ except:
+ self.log.exception("Failed to flush accounting data")
+
+
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = time.time()
+ instanceText = None
+ hostText = None
+
+ if instance is not None:
+ try:
+ instanceText = 'Instance(%s)' % (instance)
+ except:
+ self.log.exception("Invalid instance data")
+
+ if host is not None:
+ try:
+ hostText = "Host(%s)" % (host)
+ except:
+ self.log.exception("Invalid host data")
+
+ secondary = ','.join(filter(None, (hostText, instanceText)))
+
+ line = "%s|%s|%s" % (now, text, secondary)
+
+ self.accountBuffer.append(line)
+ self.accountLines += 1
+
+ # XXXstroucki think about force flush every so often
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
+
+
+ # service thread function
+ def __registerWithClusterManager(self):
while True:
- if cmUseCount > 10 or cm is None:
- try:
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cmUseCount = 0
- except Exception, e:
- self.log.warning("Could not get a handle to the clustermanager")
- time.sleep(60)
- continue
- cmUseCount = cmUseCount + 1
+ #self.__ACCOUNT("TESTING")
start = time.time()
try:
- host = self.vmm.getHostInfo(self)
instances = self.instances.values()
- #import pprint
- #self.log.warning("Instances: " + pprint.saferepr(instances))
- self.id = cm.registerNodeManager(host, instances)
- except Exception, e:
+ self.id = self.cm.registerNodeManager(self.host, instances)
+ except Exception:
self.log.exception('Failed to register with the CM')
+
toSleep = start - time.time() + self.registerFrequency
if (toSleep > 0):
time.sleep(toSleep)
-
- def getInstance(self, vmId):
- instance = self.instances.get(vmId, None)
- if (instance is None):
- raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
- return instance
-
- def instantiateVm(self, instance):
- vmId = self.vmm.instantiateVm(instance)
- instance.vmId = vmId
- instance.state = InstanceState.Running
- self.instances[vmId] = instance
- return vmId
-
- def suspendVm(self, vmId, destination):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.Suspending
- threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
-
- def resumeVmHelper(self, instance, name):
- self.vmm.resumeVmHelper(instance, name)
- instance.state = InstanceState.Running
- newInstance = Instance(d={'id':instance.id,'state':instance.state})
- success = lambda: None
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- try:
- cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
- except Exception, e:
- self.log.exception('vmUpdate failed in resumeVmHelper')
- self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
- else:
- success()
-
- def resumeVm(self, instance, name):
- instance.state = InstanceState.Resuming
- instance.hostId = self.id
- try:
- instance.vmId = self.vmm.resumeVm(instance, name)
- self.instances[instance.vmId] = instance
- threading.Thread(target=self.resumeVmHelper, args=(instance, name)).start()
- except:
- self.log.exception('resumeVm failed')
- raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
- return instance.vmId
-
- def prepReceiveVm(self, instance, source):
- instance.vmId = -1
- transportCookie = self.vmm.prepReceiveVm(instance, source.name)
- return transportCookie
- def prepSourceVm(self, vmId):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.MigratePrep
-
- def migrateVmHelper(self, instance, target, transportCookie):
- self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
- del self.instances[instance.vmId]
-
- def migrateVm(self, vmId, target, transportCookie):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.MigrateTrans
- threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
- return
-
- def receiveVmHelper(self, instance, transportCookie):
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- vmId = self.vmm.receiveVm(transportCookie)
- instance.state = InstanceState.Running
- instance.hostId = self.id
- instance.vmId = vmId
- self.instances[vmId] = instance
- newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
- success = lambda: None
- try:
- cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
- except Exception, e:
- self.log.exception('vmUpdate failed in receiveVmHelper')
- self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
- else:
- success()
-
- def receiveVm(self, instance, transportCookie):
- instance.state = InstanceState.MigrateTrans
- threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
- return
-
- def pauseVm(self, vmId):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.Pausing
- self.vmm.pauseVm(vmId)
- instance.state = InstanceState.Paused
-
- def unpauseVm(self, vmId):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.Unpausing
- self.vmm.unpauseVm(vmId)
- instance.state = InstanceState.Running
-
- def shutdownVm(self, vmId):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.ShuttingDown
- self.vmm.shutdownVm(vmId)
-
- def destroyVm(self, vmId):
- instance = self.getInstance(vmId)
- instance.state = InstanceState.Destroying
- self.vmm.destroyVm(vmId)
-
- def getVmInfo(self, vmId):
- instance = self.getInstance(vmId)
- return instance
-
- def vmmSpecificCall(self, vmId, arg):
- return self.vmm.vmmSpecificCall(vmId, arg)
-
- def listVms(self):
- return self.instances.keys()
-
- def liveCheck(self):
- return "alive"
-
- def statsThread(self):
+ # service thread function
+ def __statsThread(self):
if (self.statsInterval == 0):
return
while True:
@@ -323,12 +213,220 @@
self.log.exception('statsThread threw an exception')
time.sleep(self.statsInterval)
- def registerHost(self):
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+ def __registerHost(self):
hostname = socket.gethostname()
# populate some defaults
# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
memory = 0
cores = 0
version = "empty"
- #cm.registerHost(hostname, memory, cores, version)
+ #self.cm.registerHost(hostname, memory, cores, version)
+
+ def __getInstance(self, vmId):
+ instance = self.instances.get(vmId, None)
+ if instance is not None:
+ return instance
+
+ # refresh self.instances if not found
+ self.__loadVmInfo()
+ instance = self.instances.get(vmId, None)
+ if instance is not None:
+ return instance
+
+
+ raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
+
+ # remote
+ # Called from VMM to update self.instances
+ # but only changes are Exited, MigrateTrans and Running
+ # qemu.py calls this in the matchSystemPids thread
+ # xenpv.py: i have no real idea why it is called there
+ def vmStateChange(self, vmId, old, cur):
+ instance = self.__getInstance(vmId)
+
+ if (instance.state == cur):
+ # Don't do anything if state is what it should be
+ return True
+
+ if (old and instance.state != old):
+ # make a note of mismatch, but go on.
+ # the VMM should know best
+ self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+
+ instance.state = cur
+
+ self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
+
+ newInst = Instance(d={'state':cur})
+ success = lambda: None
+ # send the state change up to the CM
+ self.notifyCM.append((instance.id, newInst, old, success))
+ self.__flushNotifyCM()
+
+ # cache change locally
+ self.instances[vmId] = instance
+
+ if (cur == InstanceState.Exited):
+ # At this point, the VMM will clean up,
+ # so forget about this instance
+ del self.instances[vmId]
+ return True
+
+ return True
+
+ # remote
+ def createInstance(self, instance):
+ vmId = instance.vmId
+ self.instances[vmId] = instance
+
+
+ # remote
+ def instantiateVm(self, instance):
+ self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
+ try:
+ vmId = self.vmm.instantiateVm(instance)
+ #instance.vmId = vmId
+ #instance.state = InstanceState.Running
+ #self.instances[vmId] = instance
+ return vmId
+ except:
+ self.log.exception("Failed to start instance")
+
+ # remote
+ def suspendVm(self, vmId, destination):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM SUSPEND", instance=instance)
+
+ instance.state = InstanceState.Suspending
+ self.instances[vmId] = instance
+ threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
+
+ # called by resumeVm as thread
+ def __resumeVmHelper(self, instance, name):
+ self.vmm.resumeVmHelper(instance, name)
+ instance.state = InstanceState.Running
+ newInstance = Instance(d={'id':instance.id,'state':instance.state})
+ success = lambda: None
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
+ self.__flushNotifyCM()
+
+ # remote
+ def resumeVm(self, instance, name):
+ self.__ACCOUNT("NM VM RESUME", instance=instance)
+ instance.state = InstanceState.Resuming
+ instance.hostId = self.id
+ try:
+ instance.vmId = self.vmm.resumeVm(instance, name)
+ self.instances[instance.vmId] = instance
+ threading.Thread(target=self.__resumeVmHelper, args=(instance, name)).start()
+ except:
+ self.log.exception('resumeVm failed')
+ raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
+ return instance.vmId
+
+ # remote
+ def prepReceiveVm(self, instance, source):
+ self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
+ instance.vmId = -1
+ transportCookie = self.vmm.prepReceiveVm(instance, source.name)
+ return transportCookie
+
+ # remote
+ def prepSourceVm(self, vmId):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM MIGRATE SOURCE PREP", instance=instance)
+ instance.state = InstanceState.MigratePrep
+ self.instances[vmId] = instance
+
+ # called by migrateVm as thread
+ # XXXstroucki migrate out?
+ def __migrateVmHelper(self, instance, target, transportCookie):
+ self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
+ del self.instances[instance.vmId]
+
+ # remote
+ # XXXstroucki migrate out?
+ def migrateVm(self, vmId, target, transportCookie):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM MIGRATE", instance=instance)
+ instance.state = InstanceState.MigrateTrans
+ self.instances[vmId] = instance
+ threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
+ return
+
+ # called by receiveVm as thread
+ # XXXstroucki migrate in?
+ def __receiveVmHelper(self, instance, transportCookie):
+ vmId = self.vmm.receiveVm(transportCookie)
+ instance.state = InstanceState.Running
+ instance.hostId = self.id
+ instance.vmId = vmId
+ self.instances[vmId] = instance
+ newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
+ success = lambda: None
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+ self.__flushNotifyCM()
+
+ # remote
+ # XXXstroucki migrate in?
+ def receiveVm(self, instance, transportCookie):
+ instance.state = InstanceState.MigrateTrans
+ vmId = instance.vmId
+ self.instances[vmId] = instance
+ self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
+ threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
+ return
+
+ # remote
+ def pauseVm(self, vmId):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM PAUSE", instance=instance)
+ instance.state = InstanceState.Pausing
+ self.instances[vmId] = instance
+ self.vmm.pauseVm(vmId)
+ instance.state = InstanceState.Paused
+ self.instances[vmId] = instance
+
+ # remote
+ def unpauseVm(self, vmId):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM UNPAUSE", instance=instance)
+ instance.state = InstanceState.Unpausing
+ self.instances[vmId] = instance
+ self.vmm.unpauseVm(vmId)
+ instance.state = InstanceState.Running
+ self.instances[vmId] = instance
+
+ # remote
+ def shutdownVm(self, vmId):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM SHUTDOWN", instance=instance)
+ instance.state = InstanceState.ShuttingDown
+ self.instances[vmId] = instance
+ self.vmm.shutdownVm(vmId)
+
+ # remote
+ def destroyVm(self, vmId):
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM DESTROY", instance=instance)
+ instance.state = InstanceState.Destroying
+ self.instances[vmId] = instance
+ self.vmm.destroyVm(vmId)
+
+ # remote
+ def getVmInfo(self, vmId):
+ instance = self.__getInstance(vmId)
+ return instance
+
+ # remote
+ def vmmSpecificCall(self, vmId, arg):
+ return self.vmm.vmmSpecificCall(vmId, arg)
+
+ # remote
+ def listVms(self):
+ return self.instances.keys()
+
+ # remote
+ def liveCheck(self):
+ return "alive"
+
diff --git a/src/tashi/nodemanager/vmcontrol/qemu.py b/src/tashi/nodemanager/vmcontrol/qemu.py
index 69cfc12..7806f4b 100644
--- a/src/tashi/nodemanager/vmcontrol/qemu.py
+++ b/src/tashi/nodemanager/vmcontrol/qemu.py
@@ -26,12 +26,11 @@
import subprocess
import sys
import time
+import shlex
-# for scratch space support
-from os import system
-
-from tashi.rpycservices.rpyctypes import *
-from tashi.util import broken, logged, scrubString, boolean
+#from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import InstanceState, Host
+from tashi.util import scrubString, boolean
from tashi import version, stringPartition
from vmcontrolinterface import VmControlInterface
@@ -47,7 +46,7 @@
try:
listenSocket.listen(5)
ls = listenSocket.fileno()
- input = child.monitorFd
+ #input = child.monitorFd
output = child.monitorFd
#print "listen"
select.select([ls], [], [])
@@ -96,6 +95,10 @@
self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
+ # XXXstroucki amount of reserved memory could be configurable
+ self.reservedMem = 512
+ # XXXstroucki perhaps make this configurable
+ self.ifPrefix = "tashi"
self.controlledVMs = {}
self.usedPorts = []
self.usedPortsLock = threading.Lock()
@@ -115,8 +118,10 @@
os.mkdir(self.INFO_DIR)
except:
pass
- self.scanInfoDir()
- threading.Thread(target=self.pollVMsLoop).start()
+
+ self.__scanInfoDir()
+
+ threading.Thread(target=self.__pollVMsLoop).start()
if (self.statsInterval > 0):
threading.Thread(target=self.statsThread).start()
@@ -124,7 +129,7 @@
def __init__(self, **attrs):
self.__dict__.update(attrs)
- def getSystemPids(self):
+ def __getHostPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
for f in os.listdir("/proc"):
@@ -136,83 +141,105 @@
pass
return pids
+ # extern
def getInstances(self):
"""Will return a dict of instances by vmId to the caller"""
return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
- def matchSystemPids(self, controlledVMs):
+ def __matchHostPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
- if self.nm is None:
- #XXXstroucki log may not be there yet either
- #self.log.info("NM hook not yet available")
- return
vmIds = controlledVMs.keys()
- pids = self.getSystemPids()
+ pids = self.__getHostPids()
+
for vmId in vmIds:
child = controlledVMs[vmId]
+ instance = child.instance
+ name = instance.name
if vmId not in pids:
+ # VM is no longer running, but is still
+ # considered controlled
+
+ # remove info file
os.unlink(self.INFO_DIR + "/%d"%(vmId))
+
+ # XXXstroucki why not use self.controlledVMs
+ # argument, so why modify this fn's formal?
del controlledVMs[vmId]
+
+ # remove any stats (appropriate?)
try:
del self.stats[vmId]
except:
pass
+
if (child.vncPort >= 0):
self.vncPortLock.acquire()
self.vncPorts.remove(child.vncPort)
self.vncPortLock.release()
- log.info("Removing vmId %d" % (vmId))
+
+ log.info("Removing vmId %d because it is no longer running" % (vmId))
+
+ # if the VM was started from this process,
+ # wait on it
if (child.OSchild):
try:
os.waitpid(vmId, 0)
except:
- log.exception("waitpid failed")
+ log.exception("waitpid failed for vmId" % (vmId))
+ # recover the child's stderr and monitor
+ # output if possible
if (child.errorBit):
if (child.OSchild):
f = open("/tmp/%d.err" % (vmId), "w")
f.write(child.stderr.read())
f.close()
+
f = open("/tmp/%d.pty" % (vmId), "w")
for i in child.monitorHistory:
f.write(i)
f.close()
- #XXXstroucki remove scratch storage
+
+ # remove scratch storage
try:
if self.scratchVg is not None:
- scratch_name = child.instance.name
- log.info("Removing any scratch for " + scratch_name)
- cmd = "/sbin/lvremove -f %s" % self.scratchVg
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+ log.info("Removing any scratch for %s" % (name))
+ cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
except:
+ log.warning("Problem cleaning scratch volumes")
pass
+ # let the NM know
try:
if (not child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.Exited)
- except Exception, e:
- log.exception("vmStateChange failed")
+ except Exception:
+ log.exception("vmStateChange failed for VM %s" % (name))
else:
+ # VM is still running
try:
+
if (child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
- else:
+ elif (instance.state == InstanceState.Orphaned) or \
+ (instance.state == InstanceState.Activating):
self.nm.vmStateChange(vmId, None, InstanceState.Running)
except:
- #XXXstroucki nm is initialised at different time
- log.exception("vmStateChange failed")
+ log.exception("vmStateChange failed for VM %s" % (name))
-
- def scanInfoDir(self):
+
+ # called once on startup
+ def __scanInfoDir(self):
"""This is not thread-safe and must only be used during class initialization"""
controlledVMs = {}
controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
if (len(controlledVMs) == 0):
- log.info("No vm information found in %s", self.INFO_DIR)
+ log.info("No VM information found in %s" % (self.INFO_DIR))
for vmId in controlledVMs:
try:
- child = self.loadChildInfo(vmId)
+ child = self.__loadChildInfo(vmId)
self.vncPortLock.acquire()
if (child.vncPort >= 0):
self.vncPorts.append(child.vncPort)
@@ -223,40 +250,46 @@
#XXXstroucki ensure instance has vmId
child.instance.vmId = vmId
- self.controlledVMs[child.pid] = child
- log.info("Adding vmId %d" % (child.pid))
- except Exception, e:
+ self.controlledVMs[vmId] = child
+ except Exception:
log.exception("Failed to load VM info for %d", vmId)
else:
log.info("Loaded VM info for %d", vmId)
- # XXXstroucki NM may not be available yet here.
- try:
- self.matchSystemPids(self.controlledVMs)
- except:
- pass
-
- def pollVMsLoop(self):
+ # service thread
+ def __pollVMsLoop(self):
"""Infinite loop that checks for dead VMs"""
+
+ # As of 2011-12-30, nm is None when this is called, and
+ # is set later by the NM. Things further down require
+ # access to the NM, so wait until it is set.
+ # Moved into __pollVMsLoop since putting it in this thread
+ # will allow the init to complete and nm to be actually
+ # set.
+
+ while self.nm is None:
+ log.info("Waiting for NM initialization")
+ time.sleep(2)
+
while True:
try:
time.sleep(self.POLL_DELAY)
- self.matchSystemPids(self.controlledVMs)
+ self.__matchHostPids(self.controlledVMs)
except:
log.exception("Exception in poolVMsLoop")
- def waitForExit(self, vmId):
+ def __waitForExit(self, vmId):
"""This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
while vmId in self.controlledVMs:
time.sleep(self.POLL_DELAY)
- def getChildFromPid(self, pid):
+ def __getChildFromPid(self, pid):
"""Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
child = self.controlledVMs.get(pid, None)
if (not child):
raise Exception, "Uncontrolled vmId %d" % (pid)
return child
- def consumeAvailable(self, child):
+ def __consumeAvailable(self, child):
"""Consume characters one-by-one until they stop coming"""
monitorFd = child.monitorFd
buf = ""
@@ -299,9 +332,9 @@
child.monitorHistory.append(buf[len(needle):])
return buf[len(needle):]
- def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
+ def __enterCommand(self, child, command, expectPrompt = True, timeout = -1):
"""Enter a command on the qemu monitor"""
- res = self.consumeAvailable(child)
+ res = self.__consumeAvailable(child)
os.write(child.monitorFd, command + "\n")
if (expectPrompt):
# XXXstroucki: receiving a vm can take a long time
@@ -309,7 +342,7 @@
res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
return res
- def loadChildInfo(self, vmId):
+ def __loadChildInfo(self, vmId):
child = self.anonClass(pid=vmId)
info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
(instance, pid, ptyFile) = cPickle.load(info)
@@ -331,11 +364,12 @@
child.vncPort = -1
return child
- def saveChildInfo(self, child):
+ def __saveChildInfo(self, child):
info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
cPickle.dump((child.instance, child.pid, child.ptyFile), info)
info.close()
+ # extern
def getHostInfo(self, service):
host = Host()
host.id = service.id
@@ -344,7 +378,7 @@
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
# XXXstroucki should have parameter for reserved mem
- host.memory = (int(memoryStr[1])/1024) - 512
+ host.memory = (int(memoryStr[1])/1024) - self.reservedMem
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
@@ -353,22 +387,39 @@
host.decayed = False
host.version = version
return host
-
- def startVm(self, instance, source):
+
+ def __stripSpace(self, s):
+ return "".join(s.split())
+
+ def __startVm(self, instance, source):
"""Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
- # Capture startVm Hints
+ # Capture __startVm Hints
# CPU hints
cpuModel = instance.hints.get("cpumodel")
+
cpuString = ""
if cpuModel:
+ # clean off whitespace
+ cpuModel = self.__stripSpace(cpuModel)
cpuString = "-cpu " + cpuModel
# Clock hints
clockString = instance.hints.get("clock", "dynticks")
+ # clean off whitespace
+ clockString = self.__stripSpace(clockString)
# Disk hints
+ # XXXstroucki: insert commentary on jcipar's performance
+ # measurements
+ # virtio is recommended, but linux will name devices
+ # vdX instead of sdX. This adds a trap for someone who
+ # converts a physical machine or other virtualization
+ # layer's image to run under Tashi.
diskInterface = instance.hints.get("diskInterface", "ide")
+ # clean off whitespace
+ diskInterface = self.__stripSpace(diskInterface)
+
diskString = ""
for index in range(0, len(instance.disks)):
@@ -398,10 +449,10 @@
diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
- # scratch disk (should be integrated better)
+ # scratch disk
scratchSize = instance.hints.get("scratchSpace", "0")
scratchSize = int(scratchSize)
- scratch_file = None
+ scratchName = None
try:
if scratchSize > 0:
@@ -410,18 +461,21 @@
# create scratch disk
# XXXstroucki: needs to be cleaned somewhere
# XXXstroucki: clean user provided instance name
- scratch_name = "lv" + instance.name
+ scratchName = "lv%s" % instance.name
# XXXstroucki hold lock
# XXXstroucki check for capacity
- cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
+ cmd = "/sbin/lvcreate --quiet -n%s -L %dG %s" % (scratchName, scratchSize, self.scratchVg)
+ # XXXstroucki check result
result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
index += 1
- thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
+ thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratchName) ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
thisDiskList.append("cache=off")
+ # XXXstroucki force scratch disk to be
+ # persistent
if (True or disk.persistent):
snapshot = "off"
migrate = "off"
@@ -434,18 +488,21 @@
if (self.useMigrateArgument):
thisDiskList.append("migrate=%s" % migrate)
- diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+ diskString = "%s-drive %s " % (diskString, ",".join(thisDiskList))
except:
- print 'caught exception'
- raise 'exception'
+ log.exception('caught exception in scratch disk formation')
+ raise
# Nic hints
nicModel = instance.hints.get("nicModel", "virtio")
+ # clean off whitespace
+ nicModel = self.__stripSpace(nicModel)
+
nicString = ""
for i in range(0, len(instance.nics)):
nic = instance.nics[i]
- nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network)
+ nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
# ACPI
if (boolean(instance.hints.get("noAcpi", False))):
@@ -455,14 +512,16 @@
# Construct the qemu command
strCmd = "%s %s %s -clock %s %s %s -m %d -smp %d -serial null -vnc none -monitor pty" % (self.QEMU_BIN, noAcpiString, cpuString, clockString, diskString, nicString, instance.memory, instance.cores)
- cmd = strCmd.split()
if (source):
- cmd = cmd + ["-incoming", source]
- strCmd = strCmd + " -incoming %s" % (source)
- log.info("QEMU command: %s" % (strCmd))
+ strCmd = '%s -incoming "%s"' % (strCmd, source)
+ # XXXstroucki perhaps we're doing it backwards
+ cmd = shlex.split(strCmd)
+
+ log.info("Executing command: %s" % (strCmd))
(pipe_r, pipe_w) = os.pipe()
pid = os.fork()
if (pid == 0):
+ # child process
pid = os.getpid()
os.setpgid(pid, pid)
os.close(pipe_r)
@@ -477,146 +536,195 @@
os.close(i)
except:
pass
+
# XXXstroucki unfortunately no kvm option yet
+ # to direct COW differences elsewhere, so change
+ # this process' TMPDIR, which kvm will honour
os.environ['TMPDIR'] = self.scratchDir
os.execl(self.QEMU_BIN, *cmd)
sys.exit(-1)
+
+ # parent process
os.close(pipe_w)
child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
child.ptyFile = None
child.vncPort = -1
child.instance.vmId = child.pid
- self.saveChildInfo(child)
+ self.__saveChildInfo(child)
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
return (child.pid, cmd)
- def getPtyInfo(self, child, issueContinue):
+ def __getPtyInfo(self, child, issueContinue):
ptyFile = None
while not ptyFile:
- l = child.stderr.readline()
- if (l == ""):
+ line = child.stderr.readline()
+ if (line == ""):
try:
os.waitpid(child.pid, 0)
except:
log.exception("waitpid failed")
raise Exception, "Failed to start VM -- ptyFile not found"
- if (l.find("char device redirected to ") != -1):
- ptyFile=l[26:].strip()
+ redirLine = "char device redirected to "
+ if (line.find(redirLine) != -1):
+ ptyFile=line[len(redirLine):].strip()
break
child.ptyFile = ptyFile
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
- self.saveChildInfo(child)
+ self.__saveChildInfo(child)
if (issueContinue):
# XXXstroucki: receiving a vm can take a long time
- self.enterCommand(child, "c", timeout=None)
+ self.__enterCommand(child, "c", timeout=None)
- def stopVm(self, vmId, target, stopFirst):
+ def __stopVm(self, vmId, target, stopFirst):
"""Universal function to stop a VM -- used by suspendVM, migrateVM """
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
if (stopFirst):
- self.enterCommand(child, "stop")
+ self.__enterCommand(child, "stop")
if (target):
retry = self.migrationRetries
while (retry > 0):
- res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
+ # migrate in foreground respecting cow backed
+ # images
+ # XXXstroucki if we're doing this in the fg
+ # then it may still be ongoing when the timeout
+ # happens, and no way of interrupting it
+ # trying to restart the migration by running
+ # the command again (when qemu is ready to
+ # listen again) is probably not helpful
+ success = False
+ res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
if (res.find("migration failed") == -1):
- retry = -1
+ success = True
+ retry = 0
+ break
else:
log.error("Migration (transiently) failed: %s\n", res)
- if (retry == 0):
+ if (retry == 0) and (success is False):
log.error("Migration failed: %s\n", res)
child.errorBit = True
raise RuntimeError
- self.enterCommand(child, "quit", expectPrompt=False)
+ # XXXstroucki what if migration is still ongoing, and
+ # qemu is not listening?
+ self.__enterCommand(child, "quit", expectPrompt=False)
return vmId
-
+
+ # extern
def instantiateVm(self, instance):
- (vmId, cmd) = self.startVm(instance, None)
- child = self.getChildFromPid(vmId)
- self.getPtyInfo(child, False)
- child.cmd = cmd
- self.saveChildInfo(child)
- return vmId
-
- def suspendVm(self, vmId, target):
- tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
- # XXX: Use fifo to improve performance
- vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
- self.dfs.copyTo(tmpTarget, target)
- return vmId
-
- def resumeVmHelper(self, instance, source):
- child = self.getChildFromPid(instance.vmId)
try:
- self.getPtyInfo(child, True)
- except RuntimeError, e:
+ (vmId, cmd) = self.__startVm(instance, None)
+ child = self.__getChildFromPid(vmId)
+ self.__getPtyInfo(child, False)
+ child.cmd = cmd
+ self.nm.createInstance(child.instance)
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
+ # XXXstroucki Should make sure Running state is saved
+ # otherwise on restart it will appear as Activating
+ # until we update the state in __matchHostPids
+ child.instance.state = InstanceState.Running
+ self.__saveChildInfo(child)
+ return vmId
+ except:
+ log.exception("instantiateVm failed")
+ raise
+
+ # extern
+ def suspendVm(self, vmId, target):
+ tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
+ # XXX: Use fifo to improve performance
+ vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
+ self.dfs.copyTo(tmpTarget, target)
+ os.unlink(tmpTarget)
+ return vmId
+
+ # extern
+ def resumeVmHelper(self, instance, source):
+ child = self.__getChildFromPid(instance.vmId)
+ try:
+ self.__getPtyInfo(child, True)
+ except RuntimeError:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
status = "paused"
while ("running" not in status):
- status = self.enterCommand(child, "info status")
+ status = self.__enterCommand(child, "info status")
time.sleep(1)
+ child.instance.state = InstanceState.Running
+ self.__saveChildInfo(child)
+ # extern
def resumeVm(self, instance, source):
fn = self.dfs.getLocalHandle("%s" % (source))
- (vmId, cmd) = self.startVm(instance, "exec:zcat %s" % (fn))
- child = self.getChildFromPid(vmId)
+ (vmId, cmd) = self.__startVm(instance, "exec:zcat %s" % (fn))
+ child = self.__getChildFromPid(vmId)
child.cmd = cmd
return vmId
-
+
+ def __checkPortListening(self, port):
+ lc = 0
+ # XXXpipe: find whether something is listening yet on the port
+ (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+ stdin.close()
+ r = stdout.read()
+ lc = int(r.strip())
+ if (lc < 1):
+ return False
+ else:
+ return True
+
+ # extern
def prepReceiveVm(self, instance, source):
self.usedPortsLock.acquire()
- port = int(random.random()*1000+19000)
- while port in self.usedPorts:
- port = int(random.random()*1000+19000)
+ while True:
+ port = random.randint(19000, 20000)
+ if port not in self.usedPorts:
+ break
+
self.usedPorts.append(port)
self.usedPortsLock.release()
- (vmId, cmd) = self.startVm(instance, "tcp:0.0.0.0:%d" % (port))
+ (vmId, cmd) = self.__startVm(instance, "tcp:0.0.0.0:%d" % (port))
transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.cmd = cmd
child.transportCookie = transportCookie
- self.saveChildInfo(child)
- # XXX: Cleanly wait until the port is open
- lc = 0
- while (lc < 1):
-# XXXpipe: find whether something is listening yet on the port
- (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
- stdin.close()
- r = stdout.read()
- lc = int(r.strip())
- if (lc < 1):
- time.sleep(1.0)
+ self.__saveChildInfo(child)
+ # XXX: Cleanly wait until the port is listening
+ while self.__checkPortListening(port) is not True:
+ time.sleep(1)
+
return transportCookie
+ # extern
def migrateVm(self, vmId, target, transportCookie):
self.migrationSemaphore.acquire()
try:
(port, _vmId, _hostname) = cPickle.loads(transportCookie)
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.migratingOut = True
- res = self.stopVm(vmId, "tcp:%s:%d" % (target, port), False)
+ # tell the VM to live-migrate out
+ res = self.__stopVm(vmId, "tcp:%s:%d" % (target, port), False)
# XXX: Some sort of feedback would be nice
# XXX: Should we block?
- self.waitForExit(vmId)
+ # XXXstroucki: isn't this what __waitForExit does?
+ self.__waitForExit(vmId)
finally:
self.migrationSemaphore.release()
return res
+ # extern
def receiveVm(self, transportCookie):
(port, vmId, _hostname) = cPickle.loads(transportCookie)
try:
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
except:
log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
raise
try:
- self.getPtyInfo(child, True)
- except RuntimeError, e:
+ self.__getPtyInfo(child, True)
+ except RuntimeError:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
@@ -625,79 +733,121 @@
self.usedPortsLock.release()
return vmId
+ # extern
def pauseVm(self, vmId):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "stop")
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "stop")
+ # XXXstroucki we have no Stopped state, so consider
+ # the VM still Running?
+ # extern
def unpauseVm(self, vmId):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "c")
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "c")
+ # XXXstroucki as above, should this be a state change
+ # or not?
+ # extern
def shutdownVm(self, vmId):
"""'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM with some versions of Linux"""
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "system_powerdown")
+ # If clean shutdown is desired, should try on VM first,
+ # shutdownVm second and if that doesn't work use
+ # destroyVm
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "system_powerdown")
+ # extern
def destroyVm(self, vmId):
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.migratingOut = False
# XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
os.kill(child.pid, signal.SIGKILL)
+ def __specificStartVnc(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ hostname = socket.gethostname()
+ if (child.vncPort == -1):
+ self.vncPortLock.acquire()
+ port = 0
+ while (port in self.vncPorts):
+ port += 1
+
+ self.vncPorts.append(port)
+ self.vncPortLock.release()
+ self.__enterCommand(child, "change vnc :%d" % (port))
+ child.vncPort = port
+ self.__saveChildInfo(child)
+ port = child.vncPort
+ return "VNC running on %s:%d" % (hostname, port + 5900)
+
+ def __specificStopVnc(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "change vnc none")
+ if (child.vncPort != -1):
+ self.vncPortLock.acquire()
+ self.vncPorts.remove(child.vncPort)
+ self.vncPortLock.release()
+ child.vncPort = -1
+ self.__saveChildInfo(child)
+ return "VNC halted"
+
+ def __specificChangeCdRom(self, vmId, iso):
+ child = self.__getChildFromPid(vmId)
+ imageLocal = self.dfs.getLocalHandle("images/" + iso)
+ self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
+ return "Changed ide1-cd0 to %s" % (iso)
+
+ def __specificStartConsole(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ hostname = socket.gethostname()
+ self.consolePortLock.acquire()
+ # XXXstroucki why not use the existing ports scheme?
+ consolePort = self.consolePort
+ self.consolePort += 1
+ self.consolePortLock.release()
+ threading.Thread(target=controlConsole, args=(child,consolePort)).start()
+ return "Control console listening on %s:%d" % (hostname, consolePort)
+
+ # extern
def vmmSpecificCall(self, vmId, arg):
arg = arg.lower()
+ changeCdText = "changecdrom:"
+
if (arg == "startvnc"):
- child = self.getChildFromPid(vmId)
- hostname = socket.gethostname()
- if (child.vncPort == -1):
- self.vncPortLock.acquire()
- port = 0
- while (port in self.vncPorts):
- port = port + 1
- self.vncPorts.append(port)
- self.vncPortLock.release()
- self.enterCommand(child, "change vnc :%d" % (port))
- child.vncPort = port
- self.saveChildInfo(child)
- port = child.vncPort
- return "VNC started on %s:%d" % (hostname, port+5900)
+ return self.__specificStartVnc(vmId)
+
elif (arg == "stopvnc"):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "change vnc none")
- if (child.vncPort != -1):
- self.vncPortLock.acquire()
- self.vncPorts.remove(child.vncPort)
- self.vncPortLock.release()
- child.vncPort = -1
- self.saveChildInfo(child)
- return "VNC halted"
- elif (arg.startswith("changecdrom:")):
- child = self.getChildFromPid(vmId)
- iso = scrubString(arg[12:])
- imageLocal = self.dfs.getLocalHandle("images/" + iso)
- self.enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
- return "Changed ide1-cd0 to %s" % (iso)
+ return self.__specificStopVnc(vmId)
+
+ elif (arg.startswith(changeCdText)):
+ iso = scrubString(arg[len(changeCdText):])
+ return self.__specificChangeCdRom(vmId, iso)
+
elif (arg == "startconsole"):
- child = self.getChildFromPid(vmId)
- hostname = socket.gethostname()
- self.consolePortLock.acquire()
- consolePort = self.consolePort
- self.consolePort = self.consolePort+1
- self.consolePortLock.release()
- threading.Thread(target=controlConsole, args=(child,consolePort)).start()
- return "Control console listenting on %s:%d" % (hostname, consolePort)
+ return self.__specificStartConsole(vmId)
+
elif (arg == "list"):
- return "startVnc\nstopVnc\nchangeCdrom:<image.iso>\nstartConsole"
+ commands = [
+ "startVnc",
+ "stopVnc",
+ "changeCdrom:<image.iso>",
+ "startConsole",
+ ]
+ return "\n".join(commands)
+
else:
- return "Unknown arg %s" % (arg)
+ return "Unknown command %s" % (arg)
+ # extern
def listVms(self):
return self.controlledVMs.keys()
-
+
+ # thread
def statsThread(self):
ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
netStats = {}
cpuStats = {}
+ # XXXstroucki be more exact here?
last = time.time() - self.statsInterval
while True:
now = time.time()
@@ -706,7 +856,7 @@
netData = f.readlines()
f.close()
for l in netData:
- if (l.find("tashi") != -1):
+ if (l.find(self.ifPrefix) != -1):
(dev, sep, ld) = stringPartition(l, ":")
dev = dev.strip()
ws = ld.split()
@@ -714,6 +864,9 @@
sendBytes = float(ws[8])
(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
+ # We seem to have overflowed
+ # XXXstroucki How likely is this to happen?
+
if (lastRecvBytes > 2**32):
lastRecvBytes = lastRecvBytes - 2**64
else:
@@ -743,13 +896,13 @@
child = self.controlledVMs[vmId]
(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
for i in range(0, len(child.instance.nics)):
- netDev = "tashi%d.%d" % (child.instance.id, i)
+ netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
self.stats[vmId] = self.stats.get(vmId, {})
child = self.controlledVMs.get(vmId, None)
if (child):
- res = self.enterCommand(child, "info blockstats")
+ res = self.__enterCommand(child, "info blockstats")
for l in res.split("\n"):
(device, sep, data) = stringPartition(l, ": ")
if (data != ""):
@@ -767,6 +920,7 @@
log.exception("statsThread threw an exception")
last = now
time.sleep(self.statsInterval)
-
+
+ # extern
def getStats(self, vmId):
return self.stats.get(vmId, {})
diff --git a/src/tashi/parallel.py b/src/tashi/parallel.py
index 8b9a217..09fe57e 100644
--- a/src/tashi/parallel.py
+++ b/src/tashi/parallel.py
@@ -116,8 +116,8 @@
# Test Code
##############################
import unittest
-import sys
-import time
+#import sys
+#import time
class TestThreadPool(unittest.TestCase):
def setUp(self):
diff --git a/src/tashi/rpycservices/rpycservices.py b/src/tashi/rpycservices/rpycservices.py
index 3deac45..c66a40e 100644
--- a/src/tashi/rpycservices/rpycservices.py
+++ b/src/tashi/rpycservices/rpycservices.py
@@ -16,11 +16,12 @@
# under the License.
import rpyc
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host, User
import cPickle
clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage']
nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo', 'liveCheck']
+accountingRPCs = ['record']
def clean(args):
"""Cleans the object so cPickle can be used."""
@@ -61,7 +62,7 @@
"""Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
if self.conn.closed == True:
self.conn = self.createConn()
- if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+ if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
return None
def connectWrap(*args):
args = cPickle.dumps(clean(args))
@@ -81,6 +82,8 @@
# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
def checkValidUser(self, functionName, clientUsername, args):
"""Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
+ if self._type == 'AccountingService':
+ return
if self._type == 'NodeManagerService':
return
if clientUsername in ['nodeManager', 'agent', 'root']:
@@ -114,4 +117,7 @@
return makeCall
if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
return makeCall
+ if self._type == 'AccountingService' and name in accountingRPCs:
+ return makeCall
+
raise AttributeError('RPC does not exist')
diff --git a/src/tashi/util.py b/src/tashi/util.py
index ae4b17d..4eb0981 100644
--- a/src/tashi/util.py
+++ b/src/tashi/util.py
@@ -16,11 +16,11 @@
# under the License.
import ConfigParser
-import cPickle
+#import cPickle
import os
-import select
+#import select
import signal
-import struct
+#import struct
import sys
import threading
import time
@@ -28,7 +28,6 @@
import types
import getpass
-import rpyc
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -109,7 +108,7 @@
def newFunc(*args, **kw):
try:
return cur(*args, **kw)
- except Exception, e:
+ except:
self.__dict__['__current_obj__'] = self.__dict__['__failsafe_obj__']
return fail(*args, **kw)
return newFunc
@@ -197,9 +196,9 @@
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
- except TashiException, e:
+ except TashiException:
raise
- except Exception, e:
+ except:
self = args[0]
if (self.convertExceptions):
raise TashiException(d={'errno':Errors.ConvertedException, 'msg': traceback.format_exc(10)})
@@ -210,7 +209,7 @@
"""Creates many permutations of a list of locations to look for config
files and then loads them"""
config = ConfigParser.ConfigParser()
- baseLocations = ['./etc/', '/usr/share/tashi/', '/etc/tashi/', os.path.expanduser('~/.tashi/')]
+ baseLocations = ['/usr/local/tashi/etc/', '/usr/share/tashi/', '/etc/tashi/', os.path.expanduser('~/.tashi/')]
names = ['Tashi'] + additionalNames
names = reduce(lambda x, y: x + [y+"Defaults", y], names, [])
allLocations = reduce(lambda x, y: x + reduce(lambda z, a: z + [y + a + ".cfg"], names, []), baseLocations, []) + additionalFiles
@@ -264,10 +263,12 @@
def createClient(config):
cfgHost = config.get('Client', 'clusterManagerHost')
cfgPort = config.get('Client', 'clusterManagerPort')
- cfgTimeout = config.get('Client', 'clusterManagerTimeout')
+ #XXXstroucki nothing uses timeout right now
+ #cfgTimeout = config.get('Client', 'clusterManagerTimeout')
host = os.getenv('TASHI_CM_HOST', cfgHost)
port = os.getenv('TASHI_CM_PORT', cfgPort)
- timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
+ #XXXstroucki nothing uses timeout right now
+ #timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if authAndEncrypt:
diff --git a/src/tashi/version.py b/src/tashi/version.py
index 45865d6..1fd7997 100644
--- a/src/tashi/version.py
+++ b/src/tashi/version.py
@@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-version = "201111"
+version = "201202"