| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Import Local Modules |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| from marvin.cloudstackAPI import ( |
| getDiagnosticsData, |
| stopSystemVm, |
| rebootSystemVm, |
| destroySystemVm, |
| updateConfiguration, |
| ) |
| from marvin.lib.utils import ( |
| cleanup_resources, |
| get_process_status, |
| get_host_credentials, |
| wait_until, |
| ) |
| from marvin.lib.base import UserData, Network |
| from marvin.lib.common import ( |
| get_zone, |
| list_hosts, |
| list_routers, |
| list_ssvms, |
| list_zones, |
| list_vlan_ipranges, |
| createEnabledNetworkOffering, |
| ) |
| from marvin.codes import PASS |
| from nose.plugins.attrib import attr |
| import telnetlib |
| import logging |
| import base64 |
| import os |
| import urllib |
| import zipfile |
| import uuid |
| import shutil |
| |
| # Import System modules |
| import time |
| |
| _multiprocess_shared_ = True |
| |
| |
| class TestSystemVMUserData(cloudstackTestCase): |
| @classmethod |
| def setUpClass(cls): |
| cls.testClient = super(TestSystemVMUserData, cls).getClsTestClient() |
| cls.api_client = cls.testClient.getApiClient() |
| |
| # Fill services from the external config file |
| cls.testData = cls.testClient.getParsedTestDataConfig() |
| |
| # Enable user data and set the script to be run on SSVM |
| cmd = updateConfiguration.updateConfigurationCmd() |
| cmd.name = "systemvm.userdata.enabled" |
| cmd.value = "true" |
| cls.api_client.updateConfiguration(cmd) |
| |
| @classmethod |
| def tearDownClass(cls): |
| # Disable user data |
| cmd = updateConfiguration.updateConfigurationCmd() |
| cmd.name = "systemvm.userdata.enabled" |
| cmd.value = "false" |
| cls.api_client.updateConfiguration(cmd) |
| |
| def setUp(self): |
| test_case = super(TestSystemVMUserData, self) |
| self.apiclient = self.testClient.getApiClient() |
| self.hypervisor = self.testClient.getHypervisorInfo() |
| self.cleanup = [] |
| self.config = test_case.getClsConfig() |
| self.services = self.testClient.getParsedTestDataConfig() |
| self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests()) |
| |
| self.logger = logging.getLogger("TestSystemVMUserData") |
| self.stream_handler = logging.StreamHandler() |
| self.logger.setLevel(logging.DEBUG) |
| self.logger.addHandler(self.stream_handler) |
| |
| def tearDown(self): |
| if self.userdata_id: |
| UserData.delete(self.apiclient, self.userdata_id) |
| self.userdata_id = None |
| |
| try: |
| cleanup_resources(self.apiclient, self.cleanup) |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| |
| def waitForSystemVMAgent(self, vmname): |
| def checkRunningAgent(): |
| list_host_response = list_hosts(self.apiclient, name=vmname) |
| if isinstance(list_host_response, list): |
| return list_host_response[0].state == "Up", None |
| return False, None |
| |
| res, _ = wait_until(3, 300, checkRunningAgent) |
| if not res: |
| raise Exception("Failed to wait for SSVM agent to be Up") |
| |
| def checkForRunningSystemVM(self, ssvm, ssvm_type=None): |
| if not ssvm: |
| return None |
| |
| def checkRunningState(): |
| if not ssvm_type: |
| response = list_ssvms(self.apiclient, id=ssvm.id) |
| else: |
| response = list_ssvms( |
| self.apiclient, zoneid=self.zone.id, systemvmtype=ssvm_type |
| ) |
| |
| if isinstance(response, list): |
| ssvm_response = response[0] |
| return ssvm_response.state == "Running", ssvm_response |
| return False, None |
| |
| res, ssvm_response = wait_until(3, 300, checkRunningState) |
| if not res: |
| self.fail("Failed to reach systemvm state to Running") |
| return ssvm_response |
| |
| def register_userdata( |
| self, userdata_name, global_setting_name, vm_type_display_name |
| ): |
| """Helper method to register userdata and configure the global setting |
| |
| Args: |
| userdata_name: Name for the userdata entry |
| global_setting_name: Global setting name to update (e.g., 'secstorage.vm.userdata', 'console.proxy.vm.userdata', 'virtual.router.userdata') |
| vm_type_display_name: Display name for the VM type (e.g., 'SSVM', 'CPVM', 'VR') |
| |
| Returns: |
| UserData object |
| """ |
| userdata_script = f"""#!/bin/bash |
| echo "User data script ran successfully on {vm_type_display_name}" > /tmp/userdata.txt |
| """ |
| b64_encoded_userdata = base64.b64encode(userdata_script.encode("utf-8")).decode( |
| "utf-8" |
| ) |
| |
| # Create a userdata entry |
| try: |
| userdata = UserData.register( |
| self.apiclient, name=userdata_name, userdata=b64_encoded_userdata |
| ) |
| userdata_id = userdata["userdata"]["id"] |
| except Exception as e: |
| if "already exists" in str(e): |
| self.debug("Userdata already exists, getting it") |
| userdata = UserData.list( |
| self.apiclient, name=userdata_name, listall=True |
| )[0] |
| userdata_id = userdata.id |
| else: |
| self.fail("Failed to register userdata: %s" % e) |
| |
| # Update global configuration to use this userdata |
| cmd = updateConfiguration.updateConfigurationCmd() |
| cmd.name = global_setting_name |
| cmd.value = userdata_id |
| self.apiclient.updateConfiguration(cmd) |
| self.debug( |
| "Updated global setting %s with userdata ID: %s" |
| % (global_setting_name, userdata.id) |
| ) |
| |
| return userdata_id |
| |
| def download_and_verify_diagnostics_data( |
| self, target_id, vm_type_display_name, expected_content, retries=4 |
| ): |
| """Helper method to download and verify diagnostics data |
| |
| Args: |
| target_id: ID of the target VM/router |
| vm_type_display_name: Display name for log messages (e.g., 'SSVM', 'CPVM', 'VR') |
| expected_content: Expected content to verify in the userdata file |
| retries: Number of retries for getDiagnosticsData (default: 4) |
| """ |
| # Create a random temporary directory for this test |
| random_suffix = uuid.uuid4().hex[:8] |
| vm_type_prefix = vm_type_display_name.lower() |
| temp_dir = f"/tmp/{vm_type_prefix}-{random_suffix}" |
| os.makedirs(temp_dir, exist_ok=True) |
| |
| # Download the file created by userdata script using the getDiagnosticsData command |
| cmd = getDiagnosticsData.getDiagnosticsDataCmd() |
| cmd.targetid = target_id |
| cmd.files = "/tmp/userdata.txt" |
| |
| # getDiagnosticsData command takes some time to work after a VM is started |
| response = None |
| while retries > -1: |
| try: |
| response = self.apiclient.getDiagnosticsData(cmd) |
| break # Success, exit retry loop |
| except Exception as e: |
| if retries >= 0: |
| retries = retries - 1 |
| self.debug( |
| "getDiagnosticsData failed (retries left: %d): %s" |
| % (retries + 1, e) |
| ) |
| if retries > -1: |
| time.sleep(30) |
| continue |
| # If all retries exhausted, re-raise the exception |
| self.fail("Failed to get diagnostics data after retries: %s" % e) |
| |
| # Download response.url file to temporary directory and extract it |
| zip_file_path = os.path.join(temp_dir, "userdata.zip") |
| extracted_file_path = os.path.join(temp_dir, "userdata.txt") |
| |
| self.debug( |
| "Downloading userdata file from %s to %s" |
| % (vm_type_display_name, zip_file_path) |
| ) |
| try: |
| urllib.request.urlretrieve(response.url, zip_file_path) |
| except Exception as e: |
| self.fail( |
| "Failed to download userdata file from %s: %s" |
| % (vm_type_display_name, e) |
| ) |
| self.debug( |
| "Downloaded userdata file from %s: %s" |
| % (vm_type_display_name, zip_file_path) |
| ) |
| |
| try: |
| with zipfile.ZipFile(zip_file_path, "r") as zip_ref: |
| zip_ref.extractall(temp_dir) |
| except zipfile.BadZipFile as e: |
| self.fail("Downloaded userdata file is not a zip file: %s" % e) |
| self.debug("Extracted userdata file from zip: %s" % extracted_file_path) |
| |
| # Verify the file contains the expected content |
| try: |
| with open(extracted_file_path, "r") as f: |
| content = f.read().strip() |
| self.debug("Userdata file content: %s" % content) |
| self.assertEqual( |
| expected_content in content, |
| True, |
| f"Check that userdata file contains expected content: '{expected_content}'", |
| ) |
| except FileNotFoundError: |
| self.fail( |
| "Userdata file not found in extracted zip at %s" % extracted_file_path |
| ) |
| except Exception as e: |
| self.fail("Failed to read userdata file: %s" % e) |
| finally: |
| # Clean up temporary directory |
| try: |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
| self.debug("Cleaned up temporary directory: %s" % temp_dir) |
| except Exception as e: |
| self.debug( |
| "Failed to clean up temporary directory %s: %s" % (temp_dir, e) |
| ) |
| |
| def test_userdata_on_systemvm( |
| self, systemvm_type, userdata_name, vm_type_display_name, global_setting_name |
| ): |
| """Helper method to test user data functionality on system VMs |
| |
| Args: |
| systemvm_type: Type of system VM ('secondarystoragevm' or 'consoleproxy') |
| userdata_name: Name for the userdata entry |
| vm_type_display_name: Display name for log messages (e.g., 'SSVM' or 'CPVM') |
| global_setting_name: Global setting name for userdata (e.g., 'secstorage.vm.userdata' or 'console.proxy.vm.userdata') |
| """ |
| # 1) Register userdata and configure global setting |
| self.userdata_id = self.register_userdata( |
| userdata_name, global_setting_name, vm_type_display_name |
| ) |
| |
| # 2) Get and destroy the system VM to trigger recreation with userdata |
| list_ssvm_response = list_ssvms( |
| self.apiclient, |
| systemvmtype=systemvm_type, |
| state="Running", |
| zoneid=self.zone.id, |
| ) |
| self.assertEqual( |
| isinstance(list_ssvm_response, list), |
| True, |
| "Check list response returns a valid list", |
| ) |
| |
| ssvm = list_ssvm_response[0] |
| self.debug("Destroying %s: %s" % (vm_type_display_name, ssvm.id)) |
| cmd = destroySystemVm.destroySystemVmCmd() |
| cmd.id = ssvm.id |
| self.apiclient.destroySystemVm(cmd) |
| |
| # 3) Wait for the system VM to be running again |
| ssvm_response = self.checkForRunningSystemVM(ssvm, systemvm_type) |
| self.debug( |
| "%s state after restart: %s" % (vm_type_display_name, ssvm_response.state) |
| ) |
| self.assertEqual( |
| ssvm_response.state, |
| "Running", |
| "Check whether %s is running or not" % vm_type_display_name, |
| ) |
| # Wait for the agent to be up |
| self.waitForSystemVMAgent(ssvm_response.name) |
| |
| # 4) Download and verify the diagnostics data |
| expected_content = ( |
| f"User data script ran successfully on {vm_type_display_name}" |
| ) |
| self.download_and_verify_diagnostics_data( |
| ssvm_response.id, vm_type_display_name, expected_content |
| ) |
| |
| @attr( |
| tags=["advanced", "advancedns", "smoke", "basic", "sg"], |
| required_hardware="true", |
| ) |
| def test_1_userdata_on_ssvm(self): |
| """Test user data functionality on SSVM""" |
| self.test_userdata_on_systemvm( |
| systemvm_type="secondarystoragevm", |
| userdata_name="ssvm_userdata", |
| vm_type_display_name="SSVM", |
| global_setting_name="secstorage.vm.userdata", |
| ) |
| |
| @attr( |
| tags=["advanced", "advancedns", "smoke", "basic", "sg"], |
| required_hardware="true", |
| ) |
| def test_2_userdata_on_cpvm(self): |
| """Test user data functionality on CPVM""" |
| self.test_userdata_on_systemvm( |
| systemvm_type="consoleproxy", |
| userdata_name="cpvm_userdata", |
| vm_type_display_name="CPVM", |
| global_setting_name="console.proxy.vm.userdata", |
| ) |
| |
| @attr( |
| tags=["advanced", "advancedns", "smoke", "basic", "sg"], |
| required_hardware="true", |
| ) |
| def test_3_userdata_on_vr(self): |
| """Test user data functionality on VR""" |
| # 1) Register userdata and configure global setting |
| self.userdata_id = self.register_userdata("vr_userdata", "virtual.router.userdata", "VR") |
| |
| # 2) Create an isolated network which will trigger VR creation with userdata |
| result = createEnabledNetworkOffering( |
| self.apiclient, self.testData["nw_off_isolated_persistent"] |
| ) |
| assert result[0] == PASS, ( |
| "Network offering creation/enabling failed due to %s" % result[2] |
| ) |
| isolated_persistent_network_offering = result[1] |
| |
| # Create an isolated network |
| self.network = Network.create( |
| self.apiclient, |
| self.testData["isolated_network"], |
| networkofferingid=isolated_persistent_network_offering.id, |
| zoneid=self.zone.id, |
| ) |
| self.assertIsNotNone(self.network, "Network creation failed") |
| self.cleanup.append(self.network) |
| self.cleanup.append(isolated_persistent_network_offering) |
| |
| # 3) Get the VR and verify it's running |
| routers = list_routers( |
| self.apiclient, networkid=self.network.id, state="Running" |
| ) |
| self.assertEqual( |
| isinstance(routers, list), |
| True, |
| "Check list router response returns a valid list", |
| ) |
| self.assertNotEqual(len(routers), 0, "Check list router response") |
| router = routers[0] |
| self.debug("Found VR: %s" % router.id) |
| |
| # 4) Download and verify the diagnostics data |
| # VR doesn't need retries as it's freshly created |
| expected_content = "User data script ran successfully on VR" |
| self.download_and_verify_diagnostics_data(router.id, "VR", expected_content) |