| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import subprocess |
| import socket |
| |
| from nose.plugins.attrib import attr |
| |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| from marvin.lib.base import (Account, ServiceOffering, VirtualMachine, NetworkOffering, Network |
| , SecurityGroup) |
| from marvin.cloudstackAPI import (authorizeSecurityGroupIngress |
| , authorizeSecurityGroupEgress, revokeSecurityGroupIngress, revokeSecurityGroupEgress) |
| from marvin.lib.common import get_zone, get_template |
| from marvin.lib.decoratorGenerators import skipTestIf |
| from .common import cleanup_resources, not_tungsten_fabric_zone |
| |
| |
| class Services: |
| """Test Tungsten Plugin |
| """ |
| |
| def __init__(self): |
| self.services = { |
| "network_offering": { |
| "name": "TungstenSharedNetworkOffering", |
| "displaytext": "TungstenSharedNetworkOffering", |
| "guestiptype": "Shared", |
| "traffictype": "GUEST", |
| "supportedservices": [ |
| "Dhcp", |
| "Dns", |
| "Connectivity", |
| "SecurityGroup" |
| ], |
| "serviceProviderList": { |
| "Dhcp": "Tungsten", |
| "Dns": "Tungsten", |
| "Connectivity": "Tungsten", |
| "SecurityGroup": "Tungsten" |
| }, |
| "useTungsten": "on", |
| "specifyVlan": "true", |
| "specifyIpRanges": "true" |
| }, |
| "networks": { |
| "network1": { |
| "name": "TF-Network1", |
| "displaytext": "TF-Network1", |
| "gateway": "192.168.6.1", |
| "netmask": "255.255.255.0", |
| "startip": "192.168.6.100", |
| "endip": "192.168.6.200", |
| "vlan": "untagged" |
| } |
| }, |
| "account": { |
| "email": "test@test.com", |
| "firstname": "Test", |
| "lastname": "User", |
| "username": "test", |
| "password": "password", |
| }, |
| "virtual_machines": { |
| "vm1": { |
| "name": "vm1", |
| "displayname": "vm1", |
| "username": "root", |
| "password": "password", |
| "ssh_port": 22, |
| "hypervisor": 'KVM', |
| "privateport": 22, |
| "publicport": 22, |
| "protocol": 'TCP' |
| |
| }, |
| "vm2": { |
| "name": "vm2", |
| "displayname": "vm2", |
| "username": "root", |
| "password": "password", |
| "ssh_port": 22, |
| "hypervisor": 'KVM', |
| "privateport": 22, |
| "publicport": 22, |
| "protocol": 'TCP' |
| }, |
| "vm3": { |
| "name": "vm3", |
| "displayname": "vm3", |
| "username": "root", |
| "password": "password", |
| "ssh_port": 22, |
| "hypervisor": 'KVM', |
| "privateport": 22, |
| "publicport": 22, |
| "protocol": 'TCP' |
| } |
| }, |
| "security_groups": { |
| "security_group1": { |
| "name": "sg1" |
| }, |
| "security_group2": { |
| "name": "sg2" |
| }, |
| "security_group3": { |
| "name": "sg3" |
| } |
| }, |
| "ostype": 'CentOS 5.6 (64-bit)' |
| } |
| |
| |
| class TestSharedZone(cloudstackTestCase): |
| @classmethod |
| def setUpClass(cls): |
| cls.api_client = cls.clstestclient.getApiClient() |
| cls.services = Services().services |
| cls.test_data = cls.clstestclient.getParsedTestDataConfig() |
| cls.zone = get_zone(cls.api_client, cls.clstestclient.getZoneForTests()) |
| cls.template = get_template(cls.api_client, cls.zone.id, cls.test_data["ostype"]) |
| cls.not_tungsten_fabric_zone = not_tungsten_fabric_zone(cls.api_client, cls.zone.id) |
| cls._cleanup = [] |
| |
| @classmethod |
| def tearDownClass(self): |
| try: |
| cleanup_resources(self.api_client, self.zone.id, self._cleanup) |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| return |
| |
| def setUp(self): |
| self.account = Account.create(self.api_client, self.services["account"], admin=True) |
| self.service_offering = ServiceOffering.create(self.api_client, self.test_data["service_offering"]) |
| self.cleanup = [self.account, self.service_offering] |
| return |
| |
| def tearDown(self): |
| try: |
| cleanup_resources(self.api_client, self.zone.id, reversed(self.cleanup)) |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| return |
| |
| def getLocalMachineIpAddress(self): |
| """ Get IP address of the machine on which test case is running """ |
| socket_ = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| socket_.connect(('8.8.8.8', 0)) |
| return socket_.getsockname()[0] |
| |
| @skipTestIf("not_tungsten_fabric_zone") |
| @attr(tags=["tungstenfabric"], required_hardware="false") |
| def test_01_test_default_security_group_rule(self): |
| |
| # Validate the following: |
| # 1. deploy vm |
| # 2. ping vm from test machine ( should fail as default security group rule ) |
| # 3. create icmp ingress security group rule with test machine ip address cidr |
| # 4. ping vm from test machine again ( should success as icmp ingress security group rule) |
| # 5. create ssh ingress security group rule with test machine ip address cidr |
| # 6. ssh to vm from test machine |
| # 7. ping google from vm ( should success as default security group rule ) |
| |
| network_offering = NetworkOffering.create(self.api_client, self.services["network_offering"]) |
| network_offering.update(self.api_client, state='Enabled') |
| self.cleanup.append(network_offering) |
| |
| # create network |
| network = Network.create( |
| self.api_client, |
| self.services["networks"]["network1"], |
| networkofferingid=network_offering.id, |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(network) |
| |
| # create security group |
| security_group = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group1"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group) |
| |
| # create virtual machine |
| vm = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm1"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[network.id], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm) |
| |
| # ping vm from test machine |
| if os.name == 'nt': |
| result = subprocess.call( |
| ['ping', '-n', '1', vm.ipaddress]) |
| else: |
| result = subprocess.call( |
| ['ping', '-c', '1', vm.ipaddress]) |
| |
| self.assertEqual( |
| result, |
| 1, |
| "Ping should be fail as default security group deny all incoming traffic" |
| ) |
| |
| test_machine_ip_address = self.getLocalMachineIpAddress() |
| cidr = test_machine_ip_address + "/32" |
| |
| # create icmp ingress security group rule with test machine ip address cidr |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.cidrlist = cidr |
| icmp_ingress_rule = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # ping vm from test machine again |
| if os.name == 'nt': |
| result = subprocess.call( |
| ['ping', '-n', '1', vm.ipaddress]) |
| else: |
| result = subprocess.call( |
| ['ping', '-c', '1', vm.ipaddress]) |
| |
| self.assertEqual( |
| result, |
| 0, |
| "Ping should be successful" |
| ) |
| |
| # create ssh ingress security group rule with test machine ip address cidr |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ssh_ingress_rule = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # ssh to vm from test machine |
| ssh = vm.get_ssh_client() |
| |
| # ping google from vm |
| res = ssh.execute("ping -c1 google.com") |
| self.assertEqual( |
| str(res).count("1 received"), |
| 1, |
| "Ping google.com from vm should be successful " |
| "as default security group permit all outcoming traffic" |
| ) |
| |
| # revoke icmp ingress security group rule |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = icmp_ingress_rule.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # revoke ssh ingress security group rule |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ssh_ingress_rule.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| @skipTestIf("not_tungsten_fabric_zone") |
| @attr(tags=["tungstenfabric"], required_hardware="false") |
| def test_01_test_ingress_rule_security_group_with_cidr(self): |
| |
| # Validate the following: |
| # 1. deploy 2 vm : vm1 and vm2 |
| # 2. create ssh ingress security group rule with test machine ip address cidr |
| # 3. ssh to vm1 |
| # 4. ssh to vm2 from vm1 ( should fail as ssh ingress security group rule ) |
| # 5. create ssh ingress security group rule with vm1 ip address cidr |
| # 4. ssh to vm2 from vm1 again ( should success as ssh ingress security group rule ) |
| |
| network_offering = NetworkOffering.create(self.api_client, self.services["network_offering"]) |
| network_offering.update(self.api_client, state='Enabled') |
| self.cleanup.append(network_offering) |
| |
| # create network |
| network = Network.create( |
| self.api_client, |
| self.services["networks"]["network1"], |
| networkofferingid=network_offering.id, |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(network) |
| |
| # create security group |
| security_group = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group1"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group) |
| |
| # create virtual machine 1 |
| vm1 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm1"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm1) |
| |
| # create virtual machine 2 |
| vm2 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm2"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm2) |
| |
| # create ssh ingress security group rule with test machine ip address cidr |
| test_machine_ip_address = self.getLocalMachineIpAddress() |
| cidr = test_machine_ip_address + "/32" |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| ssh = vm1.get_ssh_client() |
| res1 = ssh.execute("ssh %s@%s -v" % (vm2.username, vm2.ssh_ip)) |
| self.debug("Response is :%s" % res1) |
| self.assertFalse("connection established" in str(res1).lower(), "SSH to VM 2 should fail") |
| |
| # create ssh ingress security group rule with vm1 ip address cidr |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = vm1.ipaddress + "/32" |
| ingress_rule2 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| res2 = ssh.execute("ssh %s@%s -v" % (vm2.username, vm2.ssh_ip)) |
| self.debug("Response is :%s" % res2) |
| self.assertTrue("connection established" in str(res2).lower(), "SSH to VM 2 should success") |
| |
| # revoke ssh ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # revoke ssh ingress security group rule 2 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ingress_rule2.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| @skipTestIf("not_tungsten_fabric_zone") |
| @attr(tags=["tungstenfabric"], required_hardware="false") |
| def test_01_test_ingress_rule_security_group_with_account(self): |
| |
| # Validate the following: |
| # 1. create network 1 |
| # 2. create security group 1 |
| # 3. create security group 2 |
| # 4. deploy vm 1 |
| # 5. deploy vm 2 |
| # 6. create ssh ingress security group 1 rule with test machine ip address cidr |
| # 7. ssh to vm1 from test machine |
| # 8. ping vm2 from vm1 ( should fail ) |
| # 9. create icmp ingress security group 2 rule with account and security group 1 |
| # 10. ping vm2 from vm1 again ( should success ) |
| # 11. create ssh ingress security group 2 rule with test machine ip address cidr |
| # 12. ssh to vm2 from test machine |
| # 13. ping vm1 from vm2 ( should fail ) |
| # 14. create icmp ingress security group 1 rule with account and security group 2 |
| # 15. ping vm1 from vm2 again ( should success ) |
| # 16. revoke ssh ingress security group rule 1 |
| # 17. revoke ssh ingress security group rule 2 |
| # 18. revoke icmp ingress security group rule 1 |
| # 19. revoke icmp ingress security group rule 2 |
| |
| network_offering = NetworkOffering.create(self.api_client, self.services["network_offering"]) |
| network_offering.update(self.api_client, state='Enabled') |
| self.cleanup.append(network_offering) |
| |
| # 1. create network 1 |
| network = Network.create( |
| self.api_client, |
| self.services["networks"]["network1"], |
| networkofferingid=network_offering.id, |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(network) |
| |
| # 2. create security group 1 |
| security_group1 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group1"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group1) |
| |
| # 3. create security group 2 |
| security_group2 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group2"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group2) |
| |
| # 4. deploy vm 1 |
| vm1 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm1"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group1.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm1) |
| |
| # 5. deploy vm 2 |
| vm2 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm2"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group2.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm2) |
| |
| # 6. create ssh ingress security group 1 rule with test machine ip address cidr |
| test_machine_ip_address = self.getLocalMachineIpAddress() |
| cidr = test_machine_ip_address + "/32" |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ssh_ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 7. ssh to vm1 from test machine |
| ssh1 = vm1.get_ssh_client() |
| |
| # 8. ping vm2 from vm1 ( should fail ) |
| res1 = ssh1.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res1).count("0 received"), |
| 1, |
| "Ping vm2 from vm1 should be fail" |
| ) |
| |
| # 9. create icmp ingress security group 2 rule with account and security group 1 |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group2.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group1.name)}] |
| icmp_ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 10. ping vm2 from vm1 again ( should success ) |
| res2 = ssh1.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res2).count("1 received"), |
| 1, |
| "Ping vm2 from vm1 should be success" |
| ) |
| |
| # 11. create ssh ingress security group 2 rule with test machine ip address cidr |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group2.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ssh_ingress_rule2 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 12. ssh to vm2 from test machine |
| ssh2 = vm2.get_ssh_client() |
| |
| # 13. ping vm1 from vm2 ( should fail ) |
| res3 = ssh2.execute("ping -c1 %s" % vm1.ipaddress) |
| self.assertEqual( |
| str(res3).count("0 received"), |
| 1, |
| "Ping vm1 from vm2 should be fail" |
| ) |
| |
| # 14. create icmp ingress security group 1 rule with account and security group 2 |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group2.name)}] |
| icmp_ingress_rule2 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 15. ping vm1 from vm2 again ( should success ) |
| res4 = ssh2.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res4).count("1 received"), |
| 1, |
| "Ping vm1 from vm2 should be success" |
| ) |
| |
| # 16. revoke ssh ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ssh_ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 17. revoke ssh ingress security group rule 2 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ssh_ingress_rule2.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 18. revoke icmp ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = icmp_ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 19. revoke icmp ingress security group rule 2 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = icmp_ingress_rule2.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| @skipTestIf("not_tungsten_fabric_zone") |
| @attr(tags=["tungstenfabric"], required_hardware="false") |
| def test_01_test_egress_rule_security_group_with_cidr(self): |
| |
| # Validate the following: |
| # 1. create network offering |
| # 2. create network 1 |
| # 3. create security group 1 |
| # 4. deploy vm1 |
| # 5. create ssh ingress security group 1 rule with test machine ip address cidr |
| # 6. ssh to vm1 from test machine |
| # 7. ping 8.8.8.8 from vm1 ( should success ) |
| # 8. ping 4.4.4.4 from vm1 ( should success ) |
| # 9. create icmp egress security group 1 rule with 8.8.8.8/32 cidr |
| # 10. ping 8.8.8.8 from vm 1 ( should success ) |
| # 11. ping 8.8.4.4 from vm1 ( should fail ) |
| # 12. create icmp egress security group 1 rule with 8.8.4.4/32 cidr |
| # 13. ping 8.8.4.4 from vm1 ( should success ) |
| |
| # 1. create network offering |
| network_offering = NetworkOffering.create( |
| self.api_client, |
| self.services["network_offering"] |
| ) |
| network_offering.update(self.api_client, state='Enabled') |
| self.cleanup.append(network_offering) |
| |
| # 2. create network 1 |
| network = Network.create( |
| self.api_client, |
| self.services["networks"]["network1"], |
| networkofferingid=network_offering.id, |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(network) |
| |
| # 3. create security group 1 |
| security_group1 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group1"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group1) |
| |
| # 4. deploy vm 1 |
| vm1 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm1"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group1.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm1) |
| |
| # 5. create ssh ingress security group 1 rule with test machine ip address cidr |
| test_machine_ip_address = self.getLocalMachineIpAddress() |
| cidr = test_machine_ip_address + "/32" |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ssh_ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 6. ssh to vm1 from test machine |
| ssh1 = vm1.get_ssh_client() |
| |
| # 7. ping 8.8.8.8 from vm 1 ( should success ) |
| res1 = ssh1.execute("ping -c1 8.8.8.8") |
| self.assertEqual( |
| str(res1).count("1 received"), |
| 1, |
| "Ping 8.8.8.8 from vm1 should be success" |
| ) |
| |
| # 8. ping 8.8.4.4 from vm1 ( should success ) |
| res2 = ssh1.execute("ping -c1 8.8.4.4") |
| self.assertEqual( |
| str(res2).count("1 received"), |
| 1, |
| "Ping 8.8.4.4 from vm1 should be success" |
| ) |
| |
| # 9. create icmp egress security group 1 rule with 8.8.8.8/32 cidr |
| cmd = authorizeSecurityGroupEgress.authorizeSecurityGroupEgressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.cidrlist = '8.8.8.8/32' |
| icmp_egress_rule1 = self.api_client.authorizeSecurityGroupEgress(cmd) |
| |
| # 10. ping 8.8.8.8 from vm 1 ( should success ) |
| res3 = ssh1.execute("ping -c1 8.8.8.8") |
| self.assertEqual( |
| str(res3).count("1 received"), |
| 1, |
| "Ping 8.8.8.8 from vm1 should be success" |
| ) |
| |
| # 11. ping 8.8.4.4 from vm1 ( should fail ) |
| res4 = ssh1.execute("ping -c1 8.8.4.4") |
| self.assertEqual( |
| str(res4).count("0 received"), |
| 1, |
| "Ping 8.8.4.4 from vm1 should be fail" |
| ) |
| |
| # 12. create icmp egress security group 1 rule with 8.8.4.4 cidr |
| cmd = authorizeSecurityGroupEgress.authorizeSecurityGroupEgressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.cidrlist = "8.8.4.4/32" |
| icmp_egress_rule2 = self.api_client.authorizeSecurityGroupEgress(cmd) |
| |
| # 13. ping vm2 from vm1 ( should success ) |
| res5 = ssh1.execute("ping -c1 8.8.4.4") |
| self.assertEqual( |
| str(res5).count("1 received"), |
| 1, |
| "Ping 8.8.4.4 from vm1 should be success" |
| ) |
| |
| # 14. revoke ssh ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ssh_ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 15. revoke icmp egress security group rule 1 |
| cmd = revokeSecurityGroupEgress.revokeSecurityGroupEgressCmd() |
| cmd.id = icmp_egress_rule1.egressrule[0].ruleid |
| self.api_client.revokeSecurityGroupEgress(cmd) |
| |
| # 16. revoke icmp egress security group rule 2 |
| cmd = revokeSecurityGroupEgress.revokeSecurityGroupEgressCmd() |
| cmd.id = icmp_egress_rule2.egressrule[0].ruleid |
| self.api_client.revokeSecurityGroupEgress(cmd) |
| |
| @skipTestIf("not_tungsten_fabric_zone") |
| @attr(tags=["tungstenfabric"], required_hardware="false") |
| def test_01_test_egress_rule_security_group_with_account(self): |
| |
| # Validate the following: |
| # 1. create network offering |
| # 2. create network 1 |
| # 3. create security group 1 |
| # 4. create security group 2 |
| # 5. create security group 3 |
| # 6. deploy vm1 |
| # 7. deploy vm2 |
| # 8. deploy vm3 |
| # 9. create ssh ingress security group 1 rule with test machine ip address cidr |
| # 10. create icmp ingress security group 2 rule with security group 1 |
| # 11. create icmp ingress security group 3 rule with security group 1 |
| # 12. ssh to vm1 from test machine |
| # 13. ping vm2 from vm1 ( should success ) |
| # 14. ping vm3 from vm1 ( should success ) |
| # 15. create icmp egress security group 1 rule with security group 2 |
| # 16. ping vm2 from vm1 ( should success ) |
| # 17. ping vm3 from vm1 ( should fail ) |
| # 18. create icmp egress security group 1 rule with security group 3 |
| # 19. ping vm2 from vm1 ( should success ) |
| # 20. ping vm3 from vm1 ( should success ) |
| # 21. revoke ssh ingress security group rule 1 |
| # 22. revoke icmp ingress security group rule 1 |
| # 23. revoke icmp ingress security group rule 2 |
| # 24. revoke icmp egress security group rule 1 |
| # 25. revoke icmp egress security group rule 2 |
| |
| # 1. create network offering |
| network_offering = NetworkOffering.create( |
| self.api_client, |
| self.services["network_offering"] |
| ) |
| network_offering.update(self.api_client, state='Enabled') |
| self.cleanup.append(network_offering) |
| |
| # 2. create network 1 |
| network = Network.create( |
| self.api_client, |
| self.services["networks"]["network1"], |
| networkofferingid=network_offering.id, |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(network) |
| |
| # 3. create security group 1 |
| security_group1 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group1"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group1) |
| |
| # 4. create security group 2 |
| security_group2 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group2"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group2) |
| |
| # 5. create security group 3 |
| security_group3 = SecurityGroup.create( |
| self.api_client, |
| self.services["security_groups"]["security_group3"], |
| account=self.account.name, |
| domainid=self.account.domainid |
| ) |
| self.cleanup.append(security_group3) |
| |
| # 6. deploy vm1 |
| vm1 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm1"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group1.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm1) |
| |
| # 7. deploy vm2 |
| vm2 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm2"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group2.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm2) |
| |
| # 8. deploy vm3 |
| vm3 = VirtualMachine.create( |
| self.api_client, |
| self.services["virtual_machines"]["vm3"], |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id, |
| networkids=[str(network.id)], |
| zoneid=self.zone.id, |
| accountid=self.account.name, |
| domainid=self.account.domainid, |
| securitygroupids=[security_group3.id], |
| mode='advanced' |
| ) |
| self.cleanup.append(vm3) |
| |
| # 9. create ssh ingress security group 1 rule with test machine ip address cidr |
| test_machine_ip_address = self.getLocalMachineIpAddress() |
| cidr = test_machine_ip_address + "/32" |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'TCP' |
| cmd.startport = 22 |
| cmd.endport = 22 |
| cmd.cidrlist = cidr |
| ssh_ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 10. create icmp ingress security group 2 rule with security group 1 |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group2.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group1.name)}] |
| icmp_ingress_rule1 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 11. create icmp ingress security group 3 rule with security group 1 |
| cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd() |
| cmd.securitygroupid = security_group3.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group1.name)}] |
| icmp_ingress_rule2 = self.api_client.authorizeSecurityGroupIngress(cmd) |
| |
| # 12. ssh to vm1 from test machine |
| ssh1 = vm1.get_ssh_client() |
| |
| # 13. ping vm2 from vm1 ( should success ) |
| res1 = ssh1.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res1).count("1 received"), |
| 1, |
| "Ping vm2 from vm1 should be success" |
| ) |
| |
| # 14. ping vm3 from vm1 ( should success ) |
| res2 = ssh1.execute("ping -c1 %s" % vm3.ipaddress) |
| self.assertEqual( |
| str(res2).count("1 received"), |
| 1, |
| "Ping vm3 from vm1 should be success" |
| ) |
| |
| # 15. create icmp egress security group 1 rule with security group 2 |
| cmd = authorizeSecurityGroupEgress.authorizeSecurityGroupEgressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group2.name)}] |
| icmp_egress_rule1 = self.api_client.authorizeSecurityGroupEgress(cmd) |
| |
| # 16. ping vm2 from vm1 ( should success ) |
| res3 = ssh1.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res3).count("1 received"), |
| 1, |
| "Ping vm2 from vm1 should be success" |
| ) |
| |
| # 17. ping vm3 from vm1 ( should fail ) |
| res4 = ssh1.execute("ping -c1 %s" % vm3.ipaddress) |
| self.assertEqual( |
| str(res4).count("0 received"), |
| 1, |
| "Ping vm3 from vm1 should be fail" |
| ) |
| |
| # 18. create icmp egress security group 1 rule with security group 3 |
| cmd = authorizeSecurityGroupEgress.authorizeSecurityGroupEgressCmd() |
| cmd.securitygroupid = security_group1.id |
| cmd.protocol = 'ICMP' |
| cmd.icmptype = "-1" |
| cmd.icmpcode = "-1" |
| cmd.usersecuritygrouplist = [{'account':str(self.account.name),'group': str(security_group3.name)}] |
| icmp_egress_rule2 = self.api_client.authorizeSecurityGroupEgress(cmd) |
| |
| # 19. ping vm2 from vm1 ( should success ) |
| res5 = ssh1.execute("ping -c1 %s" % vm2.ipaddress) |
| self.assertEqual( |
| str(res5).count("1 received"), |
| 1, |
| "Ping vm2 from vm1 should be success" |
| ) |
| |
| # 20. ping vm3 from vm1 ( should success ) |
| res6 = ssh1.execute("ping -c1 %s" % vm3.ipaddress) |
| self.assertEqual( |
| str(res6).count("1 received"), |
| 1, |
| "Ping vm3 from vm1 should be success" |
| ) |
| |
| # 21. revoke ssh ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = ssh_ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 22. revoke icmp ingress security group rule 1 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = icmp_ingress_rule1.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 23. revoke icmp ingress security group rule 2 |
| cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd() |
| cmd.id = icmp_ingress_rule2.ingressrule[0].ruleid |
| self.api_client.revokeSecurityGroupIngress(cmd) |
| |
| # 24. revoke icmp egress security group rule 1 |
| cmd = revokeSecurityGroupEgress.revokeSecurityGroupEgressCmd() |
| cmd.id = icmp_egress_rule1.egressrule[0].ruleid |
| self.api_client.revokeSecurityGroupEgress(cmd) |
| |
| # 25. revoke icmp egress security group rule 2 |
| cmd = revokeSecurityGroupEgress.revokeSecurityGroupEgressCmd() |
| cmd.id = icmp_egress_rule2.egressrule[0].ruleid |
| self.api_client.revokeSecurityGroupEgress(cmd) |