blob: 8ae2c7ff7979df5f00a7735837c812e1ce098e56 [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings
warnings.filterwarnings('ignore')
import json
import sys
import winrm
def fail(message):
print(json.dumps({"error": message}))
sys.exit(1)
def succeed(data):
print(json.dumps(data))
sys.exit(0)
class HyperVManager:
def __init__(self, config_path):
self.config_path = config_path
self.data = self.parse_json()
self.session = self.init_winrm_session()
def parse_json(self):
try:
with open(self.config_path, 'r') as f:
json_data = json.load(f)
external_host_details = json_data["externaldetails"].get("host", [])
data = {
"url": external_host_details["url"],
"username": external_host_details["username"],
"password": external_host_details["password"],
"network_switch": external_host_details["network_switch"],
"vhd_path": external_host_details["vhd_path"],
"vm_path": external_host_details["vm_path"],
"cert_validation": "validate" if external_host_details.get("verify_tls_certificate", "true").lower() == "true" else "ignore"
}
external_vm_details = json_data["externaldetails"].get("virtualmachine", [])
if external_vm_details:
data["template_type"] = external_vm_details["template_type"]
data["generation"] = external_vm_details.get("generation", 1)
data["template_path"] = external_vm_details.get("template_path", "")
data["iso_path"] = external_vm_details.get("iso_path", "")
data["vhd_size_gb"] = external_vm_details.get("vhd_size_gb", "")
data["cpus"] = json_data["cloudstack.vm.details"]["cpus"]
data["memory"] = json_data["cloudstack.vm.details"]["minRam"]
data["vmname"] = json_data["cloudstack.vm.details"]["name"]
nics = json_data["cloudstack.vm.details"].get("nics", [])
data["nics"] = []
for nic in nics:
data["nics"].append({
"mac": nic["mac"],
"vlan": nic["broadcastUri"].replace("vlan://", "")
})
parameters = json_data.get("parameters", [])
if parameters:
data["snapshot_name"] = parameters.get("snapshot_name", "")
return data
except KeyError as e:
fail(f"Missing required field in JSON: {str(e)}")
except Exception as e:
fail(f"Error parsing JSON: {str(e)}")
def init_winrm_session(self):
return winrm.Session(
f"https://{self.data['url']}:5986/wsman",
auth=(self.data["username"], self.data["password"]),
transport='ntlm',
server_cert_validation=self.data["cert_validation"]
)
def run_ps_int(self, command):
r = self.session.run_ps(command)
if r.status_code != 0:
raise Exception(r.std_err.decode())
return r.std_out.decode()
def run_ps(self, command):
try:
output = self.run_ps_int(command)
return output
except Exception as e:
fail(str(e))
def vm_not_present(self, exception):
vm_not_present_str = f'Hyper-V was unable to find a virtual machine with name "{self.data["vmname"]}"'
return vm_not_present_str in str(exception)
def create(self):
vm_name = self.data["vmname"]
cpus = self.data["cpus"]
memory = self.data["memory"]
memory_mb = int(memory) / 1024 / 1024
template_path = self.data["template_path"]
vhd_path = self.data["vhd_path"] + "\\" + vm_name + ".vhdx"
vhd_size_gb = self.data["vhd_size_gb"]
generation = self.data["generation"]
iso_path = self.data["iso_path"]
network_switch = self.data["network_switch"]
vm_path = self.data["vm_path"]
template_type = self.data.get("template_type", "template")
vhd_created = False
vm_created = False
vm_started = False
try:
command = (
f'New-VM -Name "{vm_name}" -MemoryStartupBytes {memory_mb}MB '
f'-Generation {generation} -Path "{vm_path}" '
)
if template_type == "iso":
if (iso_path == ""):
fail("Missing required field in JSON: iso_path")
if (vhd_size_gb == ""):
fail("Missing required field in JSON: vhd_size_gb")
command += (
f'-NewVHDPath "{vhd_path}" -NewVHDSizeBytes {vhd_size_gb}GB; '
f'Add-VMDvdDrive -VMName "{vm_name}" -Path "{iso_path}"; '
)
else:
if (template_path == ""):
fail("Missing required field in JSON: template_path")
self.run_ps_int(f'Copy-Item "{template_path}" "{vhd_path}"')
vhd_created = True
command += f'-VHDPath "{vhd_path}"; '
self.run_ps_int(command)
vm_created = True
command = f'Remove-VMNetworkAdapter -VMName "{vm_name}" -Name "Network Adapter" -ErrorAction SilentlyContinue; '
self.run_ps_int(command)
command = f'Set-VMProcessor -VMName "{vm_name}" -Count "{cpus}"; '
if (generation == 2):
command += f'Set-VMFirmware -VMName "{vm_name}" -EnableSecureBoot Off; '
self.run_ps_int(command)
for idx, nic in enumerate(self.data["nics"]):
adapter_name = f"NIC{idx+1}"
self.run_ps_int(f'Add-VMNetworkAdapter -VMName "{vm_name}" -SwitchName "{network_switch}" -Name "{adapter_name}"')
self.run_ps_int(f'Set-VMNetworkAdapter -VMName "{vm_name}" -Name "{adapter_name}" -StaticMacAddress "{nic["mac"]}"')
self.run_ps_int(f'Set-VMNetworkAdapterVlan -VMName "{vm_name}" -VMNetworkAdapterName "{adapter_name}" -Access -VlanId "{nic["vlan"]}"')
self.run_ps_int(f'Start-VM -Name "{vm_name}"')
vm_started = True
succeed({"status": "success", "message": "Instance created"})
except Exception as e:
if vm_started:
self.run_ps_int(f'Stop-VM -Name "{vm_name}" -Force -TurnOff')
if vm_created:
self.run_ps_int(f'Remove-VM -Name "{vm_name}" -Force')
if vhd_created:
self.run_ps_int(f'Remove-Item -Path "{vhd_path}" -Force')
fail(str(e))
def start(self):
self.run_ps(f'Start-VM -Name "{self.data["vmname"]}"')
succeed({"status": "success", "message": "Instance started"})
def stop(self):
try:
self.run_ps_int(f'Stop-VM -Name "{self.data["vmname"]}" -Force')
except Exception as e:
if self.vm_not_present(e):
succeed({"status": "success", "message": "Instance stopped"})
else:
fail(str(e))
succeed({"status": "success", "message": "Instance stopped"})
def reboot(self):
self.run_ps(f'Restart-VM -Name "{self.data["vmname"]}" -Force')
succeed({"status": "success", "message": "Instance rebooted"})
def status(self):
command = f'(Get-VM -Name "{self.data["vmname"]}").State'
state = self.run_ps(command)
power_state = "unknown"
if state.strip().lower() == "running":
power_state = "poweron"
elif state.strip().lower() == "off":
power_state = "poweroff"
succeed({"status": "success", "power_state": power_state})
def delete(self):
try:
self.run_ps_int(f'Remove-VM -Name "{self.data["vmname"]}" -Force')
except Exception as e:
if self.vm_not_present(e):
succeed({"status": "success", "message": "Instance deleted"})
else:
fail(str(e))
succeed({"status": "success", "message": "Instance deleted"})
def suspend(self):
self.run_ps(f'Suspend-VM -Name "{self.data["vmname"]}"')
succeed({"status": "success", "message": "Instance suspended"})
def resume(self):
self.run_ps(f'Resume-VM -Name "{self.data["vmname"]}"')
succeed({"status": "success", "message": "Instance resumed"})
def create_snapshot(self):
snapshot_name = self.data["snapshot_name"]
if snapshot_name == "":
fail("Missing required field in JSON: snapshot_name")
command = f'Checkpoint-VM -VMName "{self.data["vmname"]}" -SnapshotName "{snapshot_name}"'
self.run_ps(command)
succeed({"status": "success", "message": f"Snapshot '{snapshot_name}' created"})
def list_snapshots(self):
command = (
f'Get-VMSnapshot -VMName "{self.data["vmname"]}" '
'| Select-Object Name, @{Name="CreationTime";Expression={$_.CreationTime.ToString("s")}} '
'| ConvertTo-Json'
)
snapshots = json.loads(self.run_ps(command))
succeed({"status": "success", "printmessage": "true", "message": snapshots})
def restore_snapshot(self):
snapshot_name = self.data["snapshot_name"]
if snapshot_name == "":
fail("Missing required field in JSON: snapshot_name")
command = f'Restore-VMSnapshot -VMName "{self.data["vmname"]}" -Name "{snapshot_name}" -Confirm:$false'
self.run_ps(command)
succeed({"status": "success", "message": f"Snapshot '{snapshot_name}' restored"})
def delete_snapshot(self):
snapshot_name = self.data["snapshot_name"]
if snapshot_name == "":
fail("Missing required field in JSON: snapshot_name")
command = f'Remove-VMSnapshot -VMName "{self.data["vmname"]}" -Name "{snapshot_name}" -Confirm:$false'
self.run_ps(command)
succeed({"status": "success", "message": f"Snapshot '{snapshot_name}' deleted"})
def main():
if len(sys.argv) < 3:
fail("Usage: script.py <operation> '<json-file-path>'")
operation = sys.argv[1].lower()
json_file_path = sys.argv[2]
try:
manager = HyperVManager(json_file_path)
except FileNotFoundError:
fail(f"JSON file not found: {json_file_path}")
except json.JSONDecodeError:
fail("Invalid JSON in file")
operations = {
"create": manager.create,
"start": manager.start,
"stop": manager.stop,
"reboot": manager.reboot,
"delete": manager.delete,
"status": manager.status,
"suspend": manager.suspend,
"resume": manager.resume,
"listsnapshots": manager.list_snapshots,
"createsnapshot": manager.create_snapshot,
"restoresnapshot": manager.restore_snapshot,
"deletesnapshot": manager.delete_snapshot
}
if operation not in operations:
fail("Invalid action")
try:
operations[operation]()
except Exception as e:
fail(str(e))
if __name__ == "__main__":
main()