blob: e34ae53fcc750f3a926168f5be08abc2b4c518f0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Import Local Modules
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.cloudstackAPI import (
getDiagnosticsData,
stopSystemVm,
rebootSystemVm,
destroySystemVm,
updateConfiguration,
)
from marvin.lib.utils import (
cleanup_resources,
get_process_status,
get_host_credentials,
wait_until,
)
from marvin.lib.base import UserData, Network
from marvin.lib.common import (
get_zone,
list_hosts,
list_routers,
list_ssvms,
list_zones,
list_vlan_ipranges,
createEnabledNetworkOffering,
)
from marvin.codes import PASS
from nose.plugins.attrib import attr
import telnetlib
import logging
import base64
import os
import urllib
import zipfile
import uuid
import shutil
# Import System modules
import time
_multiprocess_shared_ = True
class TestSystemVMUserData(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.testClient = super(TestSystemVMUserData, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
# Fill services from the external config file
cls.testData = cls.testClient.getParsedTestDataConfig()
# Enable user data and set the script to be run on SSVM
cmd = updateConfiguration.updateConfigurationCmd()
cmd.name = "systemvm.userdata.enabled"
cmd.value = "true"
cls.api_client.updateConfiguration(cmd)
@classmethod
def tearDownClass(cls):
# Disable user data
cmd = updateConfiguration.updateConfigurationCmd()
cmd.name = "systemvm.userdata.enabled"
cmd.value = "false"
cls.api_client.updateConfiguration(cmd)
def setUp(self):
test_case = super(TestSystemVMUserData, self)
self.apiclient = self.testClient.getApiClient()
self.hypervisor = self.testClient.getHypervisorInfo()
self.cleanup = []
self.config = test_case.getClsConfig()
self.services = self.testClient.getParsedTestDataConfig()
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
self.logger = logging.getLogger("TestSystemVMUserData")
self.stream_handler = logging.StreamHandler()
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(self.stream_handler)
def tearDown(self):
if self.userdata_id:
UserData.delete(self.apiclient, self.userdata_id)
self.userdata_id = None
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def waitForSystemVMAgent(self, vmname):
def checkRunningAgent():
list_host_response = list_hosts(self.apiclient, name=vmname)
if isinstance(list_host_response, list):
return list_host_response[0].state == "Up", None
return False, None
res, _ = wait_until(3, 300, checkRunningAgent)
if not res:
raise Exception("Failed to wait for SSVM agent to be Up")
def checkForRunningSystemVM(self, ssvm, ssvm_type=None):
if not ssvm:
return None
def checkRunningState():
if not ssvm_type:
response = list_ssvms(self.apiclient, id=ssvm.id)
else:
response = list_ssvms(
self.apiclient, zoneid=self.zone.id, systemvmtype=ssvm_type
)
if isinstance(response, list):
ssvm_response = response[0]
return ssvm_response.state == "Running", ssvm_response
return False, None
res, ssvm_response = wait_until(3, 300, checkRunningState)
if not res:
self.fail("Failed to reach systemvm state to Running")
return ssvm_response
def register_userdata(
self, userdata_name, global_setting_name, vm_type_display_name
):
"""Helper method to register userdata and configure the global setting
Args:
userdata_name: Name for the userdata entry
global_setting_name: Global setting name to update (e.g., 'secstorage.vm.userdata', 'console.proxy.vm.userdata', 'virtual.router.userdata')
vm_type_display_name: Display name for the VM type (e.g., 'SSVM', 'CPVM', 'VR')
Returns:
UserData object
"""
userdata_script = f"""#!/bin/bash
echo "User data script ran successfully on {vm_type_display_name}" > /tmp/userdata.txt
"""
b64_encoded_userdata = base64.b64encode(userdata_script.encode("utf-8")).decode(
"utf-8"
)
# Create a userdata entry
try:
userdata = UserData.register(
self.apiclient, name=userdata_name, userdata=b64_encoded_userdata
)
userdata_id = userdata["userdata"]["id"]
except Exception as e:
if "already exists" in str(e):
self.debug("Userdata already exists, getting it")
userdata = UserData.list(
self.apiclient, name=userdata_name, listall=True
)[0]
userdata_id = userdata.id
else:
self.fail("Failed to register userdata: %s" % e)
# Update global configuration to use this userdata
cmd = updateConfiguration.updateConfigurationCmd()
cmd.name = global_setting_name
cmd.value = userdata_id
self.apiclient.updateConfiguration(cmd)
self.debug(
"Updated global setting %s with userdata ID: %s"
% (global_setting_name, userdata.id)
)
return userdata_id
def download_and_verify_diagnostics_data(
self, target_id, vm_type_display_name, expected_content, retries=4
):
"""Helper method to download and verify diagnostics data
Args:
target_id: ID of the target VM/router
vm_type_display_name: Display name for log messages (e.g., 'SSVM', 'CPVM', 'VR')
expected_content: Expected content to verify in the userdata file
retries: Number of retries for getDiagnosticsData (default: 4)
"""
# Create a random temporary directory for this test
random_suffix = uuid.uuid4().hex[:8]
vm_type_prefix = vm_type_display_name.lower()
temp_dir = f"/tmp/{vm_type_prefix}-{random_suffix}"
os.makedirs(temp_dir, exist_ok=True)
# Download the file created by userdata script using the getDiagnosticsData command
cmd = getDiagnosticsData.getDiagnosticsDataCmd()
cmd.targetid = target_id
cmd.files = "/tmp/userdata.txt"
# getDiagnosticsData command takes some time to work after a VM is started
response = None
while retries > -1:
try:
response = self.apiclient.getDiagnosticsData(cmd)
break # Success, exit retry loop
except Exception as e:
if retries >= 0:
retries = retries - 1
self.debug(
"getDiagnosticsData failed (retries left: %d): %s"
% (retries + 1, e)
)
if retries > -1:
time.sleep(30)
continue
# If all retries exhausted, re-raise the exception
self.fail("Failed to get diagnostics data after retries: %s" % e)
# Download response.url file to temporary directory and extract it
zip_file_path = os.path.join(temp_dir, "userdata.zip")
extracted_file_path = os.path.join(temp_dir, "userdata.txt")
self.debug(
"Downloading userdata file from %s to %s"
% (vm_type_display_name, zip_file_path)
)
try:
urllib.request.urlretrieve(response.url, zip_file_path)
except Exception as e:
self.fail(
"Failed to download userdata file from %s: %s"
% (vm_type_display_name, e)
)
self.debug(
"Downloaded userdata file from %s: %s"
% (vm_type_display_name, zip_file_path)
)
try:
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(temp_dir)
except zipfile.BadZipFile as e:
self.fail("Downloaded userdata file is not a zip file: %s" % e)
self.debug("Extracted userdata file from zip: %s" % extracted_file_path)
# Verify the file contains the expected content
try:
with open(extracted_file_path, "r") as f:
content = f.read().strip()
self.debug("Userdata file content: %s" % content)
self.assertEqual(
expected_content in content,
True,
f"Check that userdata file contains expected content: '{expected_content}'",
)
except FileNotFoundError:
self.fail(
"Userdata file not found in extracted zip at %s" % extracted_file_path
)
except Exception as e:
self.fail("Failed to read userdata file: %s" % e)
finally:
# Clean up temporary directory
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
self.debug("Cleaned up temporary directory: %s" % temp_dir)
except Exception as e:
self.debug(
"Failed to clean up temporary directory %s: %s" % (temp_dir, e)
)
def test_userdata_on_systemvm(
self, systemvm_type, userdata_name, vm_type_display_name, global_setting_name
):
"""Helper method to test user data functionality on system VMs
Args:
systemvm_type: Type of system VM ('secondarystoragevm' or 'consoleproxy')
userdata_name: Name for the userdata entry
vm_type_display_name: Display name for log messages (e.g., 'SSVM' or 'CPVM')
global_setting_name: Global setting name for userdata (e.g., 'secstorage.vm.userdata' or 'console.proxy.vm.userdata')
"""
# 1) Register userdata and configure global setting
self.userdata_id = self.register_userdata(
userdata_name, global_setting_name, vm_type_display_name
)
# 2) Get and destroy the system VM to trigger recreation with userdata
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype=systemvm_type,
state="Running",
zoneid=self.zone.id,
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list",
)
ssvm = list_ssvm_response[0]
self.debug("Destroying %s: %s" % (vm_type_display_name, ssvm.id))
cmd = destroySystemVm.destroySystemVmCmd()
cmd.id = ssvm.id
self.apiclient.destroySystemVm(cmd)
# 3) Wait for the system VM to be running again
ssvm_response = self.checkForRunningSystemVM(ssvm, systemvm_type)
self.debug(
"%s state after restart: %s" % (vm_type_display_name, ssvm_response.state)
)
self.assertEqual(
ssvm_response.state,
"Running",
"Check whether %s is running or not" % vm_type_display_name,
)
# Wait for the agent to be up
self.waitForSystemVMAgent(ssvm_response.name)
# 4) Download and verify the diagnostics data
expected_content = (
f"User data script ran successfully on {vm_type_display_name}"
)
self.download_and_verify_diagnostics_data(
ssvm_response.id, vm_type_display_name, expected_content
)
@attr(
tags=["advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="true",
)
def test_1_userdata_on_ssvm(self):
"""Test user data functionality on SSVM"""
self.test_userdata_on_systemvm(
systemvm_type="secondarystoragevm",
userdata_name="ssvm_userdata",
vm_type_display_name="SSVM",
global_setting_name="secstorage.vm.userdata",
)
@attr(
tags=["advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="true",
)
def test_2_userdata_on_cpvm(self):
"""Test user data functionality on CPVM"""
self.test_userdata_on_systemvm(
systemvm_type="consoleproxy",
userdata_name="cpvm_userdata",
vm_type_display_name="CPVM",
global_setting_name="console.proxy.vm.userdata",
)
@attr(
tags=["advanced", "advancedns", "smoke", "basic", "sg"],
required_hardware="true",
)
def test_3_userdata_on_vr(self):
"""Test user data functionality on VR"""
# 1) Register userdata and configure global setting
self.userdata_id = self.register_userdata("vr_userdata", "virtual.router.userdata", "VR")
# 2) Create an isolated network which will trigger VR creation with userdata
result = createEnabledNetworkOffering(
self.apiclient, self.testData["nw_off_isolated_persistent"]
)
assert result[0] == PASS, (
"Network offering creation/enabling failed due to %s" % result[2]
)
isolated_persistent_network_offering = result[1]
# Create an isolated network
self.network = Network.create(
self.apiclient,
self.testData["isolated_network"],
networkofferingid=isolated_persistent_network_offering.id,
zoneid=self.zone.id,
)
self.assertIsNotNone(self.network, "Network creation failed")
self.cleanup.append(self.network)
self.cleanup.append(isolated_persistent_network_offering)
# 3) Get the VR and verify it's running
routers = list_routers(
self.apiclient, networkid=self.network.id, state="Running"
)
self.assertEqual(
isinstance(routers, list),
True,
"Check list router response returns a valid list",
)
self.assertNotEqual(len(routers), 0, "Check list router response")
router = routers[0]
self.debug("Found VR: %s" % router.id)
# 4) Download and verify the diagnostics data
# VR doesn't need retries as it's freshly created
expected_content = "User data script ran successfully on VR"
self.download_and_verify_diagnostics_data(router.id, "VR", expected_content)