blob: 124be678965c99531fb3a13adb1633b3bf7aa62e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Test for IPv4 Routed mode"""
import datetime
import logging
import random
import time
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.base import ZoneIpv4Subnet, Domain, Account, ServiceOffering, NetworkOffering, VpcOffering, Network, \
Ipv4SubnetForGuestNetwork, VirtualMachine, VPC, NetworkACLList, NetworkACL, RoutingFirewallRule, Template, ASNRange, \
BgpPeer, Router
from marvin.lib.common import get_domain, get_zone, list_routers, list_hosts
from marvin.lib.utils import get_host_credentials, get_process_status
from nose.plugins.attrib import attr
ICMPv4_ALL_TYPES = ("{ echo-reply, destination-unreachable, source-quench, redirect, echo-request, "
"router-advertisement, router-solicitation, time-exceeded, parameter-problem, timestamp-request, "
"timestamp-reply, info-request, info-reply, address-mask-request, address-mask-reply }")
SUBNET_PREFIX = "172.30."
SUBNET_1_PREFIX = SUBNET_PREFIX + str(random.randrange(100, 150))
SUBNET_2_PREFIX = SUBNET_PREFIX + str(random.randrange(151, 199))
VPC_CIDR_PREFIX = "172.31" # .0 to .16
NETWORK_CIDR_PREFIX = VPC_CIDR_PREFIX + ".100"
NETWORK_CIDR_PREFIX_DYNAMIC = VPC_CIDR_PREFIX + ".101"
MAX_RETRIES = 30
WAIT_INTERVAL = 5
test_network = None
test_network_vm = None
test_vpc = None
test_vpc_tier = None
test_vpc_vm = None
test_network_acl = None
START_ASN = 888800
END_ASN = 888888
ASN_1 = 900100 + random.randrange(1, 200)
ASN_2 = 900301 + random.randrange(0, 200)
IP4_ADDR_1 = "10.0.53.10"
IP4_ADDR_2 = "10.0.53.11"
PASSWORD_1 = "testpassword1"
PASSWORD_2 = "testpassword2"
NETWORK_OFFERING = {
"name": "Test Network offering - Routed mode",
"displaytext": "Test Network offering - Routed mode",
"networkmode": "ROUTED",
"guestiptype": "Isolated",
"supportedservices":
"Dhcp,Dns,UserData,Firewall",
"traffictype": "GUEST",
"availability": "Optional",
"egress_policy": "true",
"serviceProviderList": {
"Dhcp": "VirtualRouter",
"Dns": "VirtualRouter",
"UserData": "VirtualRouter",
"Firewall": "VirtualRouter"
}
}
VPC_OFFERING = {
"name": "Test VPC offering - Routed mode",
"displaytext": "Test VPC offering - Routed mode",
"networkmode": "ROUTED",
"supportedservices":
"Dhcp,Dns,UserData,NetworkACL"
}
VPC_NETWORK_OFFERING = {
"name": "Test VPC Network offering - Routed mode",
"displaytext": "Test VPC Network offering - Routed mode",
"networkmode": "ROUTED",
"guestiptype": "Isolated",
"supportedservices":
"Dhcp,Dns,UserData,NetworkACL",
"traffictype": "GUEST",
"availability": "Optional",
"serviceProviderList": {
"Dhcp": "VpcVirtualRouter",
"Dns": "VpcVirtualRouter",
"UserData": "VpcVirtualRouter",
"NetworkACL": "VpcVirtualRouter"
}
}
NETWORK_OFFERING_DYNAMIC = {
"name": "Test Network offering - Dynamic Routed mode",
"displaytext": "Test Network offering - Dynamic Routed mode",
"networkmode": "ROUTED",
"routingmode": "Dynamic",
"guestiptype": "Isolated",
"supportedservices":
"Dhcp,Dns,UserData,Firewall",
"traffictype": "GUEST",
"availability": "Optional",
"egress_policy": "true",
"serviceProviderList": {
"Dhcp": "VirtualRouter",
"Dns": "VirtualRouter",
"UserData": "VirtualRouter",
"Firewall": "VirtualRouter"
}
}
VPC_OFFERING_DYNAMIC = {
"name": "Test VPC offering - Routed mode",
"displaytext": "Test VPC offering - Routed mode",
"networkmode": "ROUTED",
"routingmode": "Dynamic",
"supportedservices":
"Dhcp,Dns,UserData,NetworkACL"
}
VPC_NETWORK_OFFERING_DYNAMIC = {
"name": "Test VPC Network offering - Dynamic Routed mode",
"displaytext": "Test VPC Network offering - Dynamic Routed mode",
"networkmode": "ROUTED",
"routingmode": "Dynamic",
"guestiptype": "Isolated",
"supportedservices":
"Dhcp,Dns,UserData,NetworkACL",
"traffictype": "GUEST",
"availability": "Optional",
"serviceProviderList": {
"Dhcp": "VpcVirtualRouter",
"Dns": "VpcVirtualRouter",
"UserData": "VpcVirtualRouter",
"NetworkACL": "VpcVirtualRouter"
}
}
class TestIpv4Routing(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testdata = super(TestIpv4Routing, cls).getClsTestClient()
cls.services = testdata.getParsedTestDataConfig()
cls.apiclient = testdata.getApiClient()
cls.dbclient = testdata.getDbConnection()
cls.hypervisor = testdata.getHypervisorInfo()
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient)
cls._cleanup = []
cls.logger = logging.getLogger("TestIpv4Routing")
cls.stream_handler = logging.StreamHandler()
cls.logger.setLevel(logging.DEBUG)
cls.logger.addHandler(cls.stream_handler)
# 0. register template
cls.template = Template.register(cls.apiclient, cls.services["test_templates"][cls.hypervisor.lower()],
zoneid=cls.zone.id, hypervisor=cls.hypervisor.lower())
cls.template.download(cls.apiclient)
cls._cleanup.append(cls.template)
# 1.1 create subnet for zone
cls.subnet_1 = ZoneIpv4Subnet.create(
cls.apiclient,
zoneid=cls.zone.id,
subnet=SUBNET_1_PREFIX + ".0/24"
)
cls._cleanup.append(cls.subnet_1)
# 1.2 create ASN range for zone
cls.asnrange = ASNRange.create(
cls.apiclient,
zoneid=cls.zone.id,
startasn=START_ASN,
endasn=END_ASN
)
cls._cleanup.append(cls.asnrange)
# 2. Create small service offering
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["small"]
)
cls._cleanup.append(cls.service_offering)
# 3. Create network and vpc offering with routed mode
# 3.1 Network offering for static routing
cls.network_offering_isolated = NetworkOffering.create(
cls.apiclient,
NETWORK_OFFERING
)
cls._cleanup.append(cls.network_offering_isolated)
cls.network_offering_isolated.update(cls.apiclient, state='Enabled')
# 3.2 VPC offering for static routing
cls.vpc_offering = VpcOffering.create(
cls.apiclient,
VPC_OFFERING
)
cls._cleanup.append(cls.vpc_offering)
cls.vpc_offering.update(cls.apiclient, state='Enabled')
# 3.3 VPC tier offering for static routing
cls.vpc_network_offering = NetworkOffering.create(
cls.apiclient,
VPC_NETWORK_OFFERING
)
cls._cleanup.append(cls.vpc_network_offering)
cls.vpc_network_offering.update(cls.apiclient, state='Enabled')
# 3.4 Network offering for dynamic routing
cls.network_offering_dynamic = NetworkOffering.create(
cls.apiclient,
NETWORK_OFFERING_DYNAMIC
)
cls._cleanup.append(cls.network_offering_dynamic)
cls.network_offering_dynamic.update(cls.apiclient, state='Enabled')
# 3.5 VPC Network offering for dynamic routing
cls.vpc_network_offering_dynamic = NetworkOffering.create(
cls.apiclient,
VPC_NETWORK_OFFERING_DYNAMIC
)
cls._cleanup.append(cls.vpc_network_offering_dynamic)
cls.vpc_network_offering_dynamic.update(cls.apiclient, state='Enabled')
# 4. Create sub-domain
cls.sub_domain = Domain.create(
cls.apiclient,
cls.services["acl"]["domain1"]
)
cls._cleanup.append(cls.sub_domain)
# 5. Create regular user
cls.regular_user = Account.create(
cls.apiclient,
cls.services["acl"]["accountD11A"],
domainid=cls.sub_domain.id
)
cls._cleanup.append(cls.regular_user)
# 6. Create api clients for regular user
cls.regular_user_user = cls.regular_user.user[0]
cls.regular_user_apiclient = cls.testClient.getUserApiClient(
cls.regular_user_user.username, cls.sub_domain.name
)
@classmethod
def tearDownClass(cls):
super(TestIpv4Routing, cls).tearDownClass()
@classmethod
def message(cls, msg):
cls.logger.debug("====== " + str(datetime.datetime.now()) + " " + msg + " ======")
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
def tearDown(self):
super(TestIpv4Routing, self).tearDown()
def get_router(self, networkid=None, vpcid=None):
# list router
if vpcid:
list_router_response = list_routers(
self.apiclient,
vpcid=vpcid,
listall="true"
)
else:
list_router_response = list_routers(
self.apiclient,
networkid=networkid,
listall="true"
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"list routers response should return a valid list"
)
router = list_router_response[0]
return router
def run_command_in_router(self, router, command):
# get host of router
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
# run command
result = ''
if router.hypervisor.lower() in ('vmware', 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
command,
hypervisor=router.hypervisor
)
else:
try:
host.user, host.passwd = get_host_credentials(self.config, host.ipaddress)
result = get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
command
)
except KeyError:
self.skipTest("Marvin configuration has no host credentials to check router services")
res = str(result)
self.message("VR command (%s) result: (%s)" % (command, res))
return res
def rebootRouter(self, router):
try:
Router.reboot(
self.apiclient,
id=router.id
)
except Exception as e:
self.fail("Failed to reboot the virtual router: %s, %s" % (router.id, e))
def createNetworkAclRule(self, rule):
return NetworkACL.create(self.apiclient,
services=rule,
aclid=test_network_acl.id)
def createIpv4RoutingFirewallRule(self, rule):
return RoutingFirewallRule.create(self.apiclient,
services=rule,
networkid=test_network.id)
def verifyNftablesRulesInRouter(self, router, rules):
if router.vpcid:
table = "ip4_acl"
else:
table = "ip4_firewall"
for rule in rules:
cmd = "nft list chain ip %s %s" % (table, rule["chain"])
res = self.run_command_in_router(router, cmd)
if "exists" not in rule or rule["exists"]:
exists = True
else:
exists = False
if exists and not rule["rule"] in res:
self.fail("The nftables rule (%s) should exist but is not found in the VR !!!" % rule["rule"])
if not exists and rule["rule"] in res:
self.fail("The nftables rule (%s) should not exist but is found in the VR !!!" % rule["rule"])
self.message("The nftables rules look good so far.")
def verifyPingFromRouter(self, router, vm, expected=True, retries=2):
while retries > 0:
cmd_ping_vm = "ping -c1 -W1 %s" % vm.ipaddress
try:
result = self.run_command_in_router(router, cmd_ping_vm)
if "0 packets received" in result:
retries = retries - 1
self.message("No packets received, remaining retries %s" % retries)
if retries > 0:
time.sleep(WAIT_INTERVAL)
else:
self.message("packets are received, looks good")
return
except Exception as ex:
self.fail("Failed to ping vm %s from router %s: %s" % (vm.ipaddress, router.name, ex))
if retries == 0 and expected:
self.fail("Failed to ping vm %s from router %s, which is expected to work !!!" % (vm.ipaddress, router.name))
if retries > 0 and not expected:
self.fail("ping vm %s from router %s works, however it is unexpected !!!" % (vm.ipaddress, router.name))
def verifyFrrConf(self, router, configs):
cmd = "cat /etc/frr/frr.conf"
res = self.run_command_in_router(router, cmd)
for config in configs:
if "exists" not in config or config["exists"]:
exists = True
else:
exists = False
if exists and not config["config"] in res:
self.fail("The frr config (%s) should exist but is not found in the VR !!!" % config["config"])
if not exists and config["config"] in res:
self.fail("The frr config (%s) should not exist but is found in the VR !!!" % config["config"])
self.message("The frr config look good so far.")
@attr(tags=['advanced'], required_hardware=False)
def test_01_zone_subnet(self):
""" Test for subnet for zone"""
"""
# 1. Create subnet
# 2. List subnet
# 3. Update subnet
# 4. dedicate subnet to domain
# 5. released dedicated subnet
# 6. dedicate subnet to sub-domain/account
# 7. released dedicated subnet
# 8. delete subnet
"""
self.message("Running test_01_zone_subnet")
# 1. Create subnet
self.subnet_2 = ZoneIpv4Subnet.create(
self.apiclient,
zoneid=self.zone.id,
subnet=SUBNET_2_PREFIX + ".0/24"
)
self.cleanup.append(self.subnet_2)
# 2. List subnet
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list),
True,
"List subnets for zone should return a valid list"
)
self.assertEqual(
len(subnets) == 1,
True,
"The number of subnets for zone (%s) should be equal to 1" % (len(subnets))
)
self.assertEqual(
subnets[0].subnet == SUBNET_2_PREFIX + ".0/24",
True,
"The subnet of subnet for zone (%s) should be equal to %s" % (subnets[0].subnet, SUBNET_2_PREFIX + ".0/24")
)
# 3. Update subnet
self.subnet_2.update(
self.apiclient,
subnet=SUBNET_2_PREFIX + ".0/25"
)
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1 and subnets[0].subnet == SUBNET_2_PREFIX + ".0/25",
True,
"The subnet of subnet for zone should be equal to %s" % (SUBNET_2_PREFIX + ".0/25")
)
# 4. dedicate subnet to domain
ZoneIpv4Subnet.dedicate(
self.apiclient,
id=self.subnet_2.id,
domainid=self.domain.id
)
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1 and subnets[0].domainid == self.domain.id,
True,
"The subnet should be dedicated to domain %s" % self.domain.id
)
# 5. released dedicated subnet
self.subnet_2.release(
self.apiclient
)
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1 and not subnets[0].domainid,
True,
"The subnet should not be dedicated to domain %s" % self.domain.id
)
# 6. dedicate subnet to sub-domain/account
ZoneIpv4Subnet.dedicate(
self.apiclient,
id=self.subnet_2.id,
domainid=self.sub_domain.id,
account=self.regular_user.name
)
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1
and subnets[0].domainid == self.sub_domain.id and subnets[0].account == self.regular_user.name,
True,
"The subnet should be dedicated to account %s" % self.regular_user.name
)
# 7. released dedicated subnet
self.subnet_2.release(
self.apiclient
)
subnets = ZoneIpv4Subnet.list(
self.apiclient,
id=self.subnet_2.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1 and not subnets[0].domainid,
True,
"The subnet should not be dedicated to account %s" % self.regular_user.name
)
# 8. delete subnet
self.subnet_2.delete(
self.apiclient
)
self.cleanup.remove(self.subnet_2)
@attr(tags=['advanced'], required_hardware=False)
def test_02_create_network_routed_mode_with_specified_cidr(self):
""" Test for guest network with specified cidr"""
"""
# 1. Create Isolated network
# 2. List subnet for network by subnet
# 3. Delete the network
# 4. List subnet for network by subnet. the subnet should be gone as well
"""
self.message("Running test_02_create_network_routed_mode_with_specified_cidr")
# 1. Create Isolated network
isolated_network = Network.create(
self.apiclient,
self.services["network"],
gateway=NETWORK_CIDR_PREFIX + ".1",
netmask="255.255.255.0",
networkofferingid=self.network_offering_isolated.id,
zoneid=self.zone.id
)
self.cleanup.append(isolated_network)
# 2. List subnet for network by subnet
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=NETWORK_CIDR_PREFIX + ".0/24"
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1
and subnets[0].subnet == NETWORK_CIDR_PREFIX + ".0/24" and subnets[0].state == "Allocated",
True,
"The subnet should be added for network %s" % isolated_network.name
)
# 3. Delete the network
isolated_network.delete(self.apiclient)
self.cleanup.remove(isolated_network)
# 4. List subnet for network by subnet. the subnet should be gone as well
network_cidr = subnets[0].subnet
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=network_cidr
)
self.assertEqual(
not isinstance(subnets, list) or len(subnets) == 0,
True,
"The subnet %s should be removed for network %s" % (network_cidr, isolated_network.name)
)
@attr(tags=['advanced'], required_hardware=False)
def test_03_create_subnets_for_guest_network(self):
""" Test for subnets for guest network with cidr/cidrsize"""
"""
# 1. Create subnet with cidr for guest network
# 2. List subnets for network
# 3. delete subnet for network
# 4. Create subnet with cidrsize
# 5. List subnet for network
# 6. delete subnet for network
"""
self.message("Running test_03_create_subnets_for_guest_network")
# 1. Create subnet with cidr for guest network
subnet_network_1 = Ipv4SubnetForGuestNetwork.create(
self.apiclient,
parentid=self.subnet_1.id,
subnet=SUBNET_1_PREFIX + ".0/26"
)
self.cleanup.append(subnet_network_1)
# 2. List subnets for network
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=subnet_network_1.subnet
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1,
True,
"The subnet should be created for subnet_network_1 %s" % subnet_network_1.subnet
)
# 3. delete subnet for network
subnet_network_1.delete(self.apiclient)
self.cleanup.remove(subnet_network_1)
# 4. Create subnet with cidrsize
subnet_network_2 = Ipv4SubnetForGuestNetwork.create(
self.apiclient,
parentid=self.subnet_1.id,
cidrsize=26
)
self.cleanup.append(subnet_network_2)
# 5. List subnet for network
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=subnet_network_2.subnet
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1,
True,
"The subnet should be created for subnet_network_2 %s" % subnet_network_2.subnet
)
# 6. delete subnet for network
subnet_network_2.delete(self.apiclient)
self.cleanup.remove(subnet_network_2)
@attr(tags=['advanced'], required_hardware=False)
def test_04_create_isolated_network_routed_mode_with_cidrsize(self):
""" Test for subnet and guest network with cidrsize"""
"""
# 1. Create Isolated network with cidrsize
# 2. List subnet for network by networkid
# 3. Delete the network
# 4. List subnet for network by networkid, it should be removed
"""
self.message("Running test_04_create_isolated_network_routed_mode_with_cidrsize")
# 1. Create Isolated network with cidrsize
isolated_network = Network.create(
self.apiclient,
self.services["network"],
networkofferingid=self.network_offering_isolated.id,
zoneid=self.zone.id,
cidrsize=26
)
self.cleanup.append(isolated_network)
# 2. List subnet for network by networkid
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
networkid=isolated_network.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1
and subnets[0].networkid == isolated_network.id and subnets[0].state == "Allocated",
True,
"The subnet should be created for isolated_network %s" % isolated_network.name
)
# 3. Delete the network
isolated_network.delete(self.apiclient)
self.cleanup.remove(isolated_network)
# 4. List subnet for network by network cidr, it should be removed
network_cidr = subnets[0].subnet
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=network_cidr
)
self.assertEqual(
not isinstance(subnets, list) or len(subnets) == 0,
True,
"The subnet should be removed for isolated_network %s" % isolated_network.name
)
@attr(tags=['advanced'], required_hardware=False)
def test_05_create_vpc_routed_mode_with_cidrsize(self):
""" Test for Routed VPC with cidrsize"""
"""
# 1. Create VPC with cidrsize
# 2. List subnet for network by vpcid
# 3. Delete the VPC
# 4. List subnet for network by vpcid, it should be removed
"""
self.message("Running test_05_create_vpc_routed_mode_with_cidrsize")
# 1. Create VPC with cidrsize
del self.services["vpc"]["cidr"]
vpc = VPC.create(self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_offering.id,
zoneid=self.zone.id,
cidrsize=26,
start=False
)
self.cleanup.append(vpc)
# 2. List subnet for network by networkid
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
vpcid=vpc.id
)
self.assertEqual(
isinstance(subnets, list) and len(subnets) == 1
and subnets[0].vpcid == vpc.id and subnets[0].state == "Allocated",
True,
"The subnet should be created for vpc %s" % vpc.name
)
# 3. Delete the VPC
vpc.delete(self.apiclient)
self.cleanup.remove(vpc)
# 4. List subnet for network by vpc cidr, it should be removed
vpc_cidr = subnets[0].subnet
subnets = Ipv4SubnetForGuestNetwork.list(
self.apiclient,
subnet=vpc_cidr
)
self.assertEqual(
not isinstance(subnets, list) or len(subnets) == 0,
True,
"The subnet should be removed for vpc %s" % vpc.name
)
@attr(tags=['advanced'], required_hardware=False)
def test_06_isolated_network_with_routed_mode(self):
""" Test for Isolated Network with Routed mode"""
"""
# 1. Create Isolated network
# 2. Create VM in the network
"""
self.message("Running test_06_isolated_network_with_routed_mode")
# 1. Create Isolated network
global test_network
test_network = Network.create(
self.apiclient,
self.services["network"],
networkofferingid=self.network_offering_isolated.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
gateway=NETWORK_CIDR_PREFIX + ".1",
netmask="255.255.255.0"
)
self._cleanup.append(test_network)
# 2. Create VM in the network
global test_network_vm
test_network_vm = VirtualMachine.create(
self.regular_user_apiclient,
self.services["virtual_machine"],
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
networkids=test_network.id,
serviceofferingid=self.service_offering.id,
templateid=self.template.id)
self._cleanup.append(test_network_vm)
@attr(tags=['advanced'], required_hardware=False)
def test_07_vpc_and_tier_with_routed_mode(self):
""" Test for VPC/tier with Routed mode"""
"""
# 1. Create VPC
# 2. Create Network ACL (egress = Deny, ingress = Deny)
# 3. Create VPC tier with Network ACL in the VPC
# 4. Create VM in the VPC tier
"""
self.message("Running test_07_vpc_and_tier_with_routed_mode")
# 1. Create VPC
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".0.0/22"
global test_vpc
test_vpc = VPC.create(self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_offering.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
account=self.regular_user.name,
start=False
)
self._cleanup.append(test_vpc)
# 2. Create Network ACL (egress = Deny, ingress = Deny)
global test_network_acl
test_network_acl = NetworkACLList.create(self.apiclient,
services={},
name="test-network-acl",
description="test-network-acl",
vpcid=test_vpc.id
)
# 3. Create VPC tier with Network ACL in the VPC
global test_vpc_tier
test_vpc_tier = Network.create(self.regular_user_apiclient,
self.services["network"],
networkofferingid=self.vpc_network_offering.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
vpcid=test_vpc.id,
gateway=VPC_CIDR_PREFIX + ".1.1",
netmask="255.255.255.0",
aclid=test_network_acl.id
)
self._cleanup.append(test_vpc_tier)
# 4. Create VM in the VPC tier
global test_vpc_vm
test_vpc_vm = VirtualMachine.create(
self.regular_user_apiclient,
self.services["virtual_machine"],
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
networkids=test_vpc_tier.id,
serviceofferingid=self.service_offering.id,
templateid=self.template.id)
self._cleanup.append(test_vpc_vm)
@attr(tags=['advanced'], required_hardware=False)
def test_08_vpc_and_tier_failed_cases(self):
""" Test for VPC/tier with Routed mode (some failed cases)"""
"""
# 1. create VPC with Routed mode
# 2. create network offering with NATTED mode, create vpc tier, it should fail
# 3. create vpc tier not in the vpc cidr, it should fail
"""
self.message("Running test_08_vpc_and_tier_failed_cases")
# 1. Create VPC
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".8.0/22"
test_vpc_2 = VPC.create(self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_offering.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
account=self.regular_user.name,
start=False
)
self.cleanup.append(test_vpc_2)
# 2. create network offering with NATTED mode, create vpc tier, it should fail
nw_offering_isolated_vpc = NetworkOffering.create(
self.apiclient,
self.services["nw_offering_isolated_vpc"]
)
self.cleanup.append(nw_offering_isolated_vpc)
nw_offering_isolated_vpc.update(self.apiclient, state='Enabled')
try:
test_vpc_tier_2 = Network.create(self.regular_user_apiclient,
self.services["network"],
networkofferingid=nw_offering_isolated_vpc.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
vpcid=test_vpc_2.id,
gateway=VPC_CIDR_PREFIX + ".1.1",
netmask="255.255.255.0"
)
self.cleanup.append(test_vpc_tier_2)
self.fail("Created vpc network successfully, but expected to fail")
except Exception as ex:
self.message("Failed to create vpc network due to %s, which is expected behaviour" % ex)
# 3. create vpc tier not in the vpc cidr, it should fail
try:
test_vpc_tier_3 = Network.create(self.regular_user_apiclient,
self.services["network"],
networkofferingid=self.vpc_network_offering.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
vpcid=test_vpc_2.id,
gateway=VPC_CIDR_PREFIX + ".31.1",
netmask="255.255.255.0"
)
self.cleanup.append(test_vpc_tier_3)
self.fail("Created vpc network successfully, but expected to fail")
except Exception as ex:
self.message("Failed to create vpc network due to %s, which is expected behaviour" % ex)
@attr(tags=['advanced'], required_hardware=False)
def test_09_connectivity_between_network_and_vpc_tier(self):
""" Test for connectivity between VMs in the Isolated Network and VPC/tier"""
"""
# 0. Get static routes of Network/VPC
# 1. Add static routes in VRs manually
# 2. Test VM2 in VR1-Network (ping/ssh should fail)
# 3. Test VM1 in VR2-VPC (ping/ssh should fail)
# 4. Create Ingress rules in Network ACL for VPC
# 5. Create Egress rules in Network ACL for VPC
# 6. Test VM2 in VR1-Network (ping/ssh should succeed)
# 7. Test VM1 in VR2-VPC (ping/ssh should fail)
# 8. Create IPv4 firewalls for Isolated network
# 9. Test VM2 in VR1-Network (ping/ssh should succeed)
# 10. Test VM1 in VR2-VPC (ping/ssh should succeed)
# 11. Delete Network ACL rules for VPC
# 12. Delete IPv4 firewall rules for Network
# 13. Test VM2 in VR1-Network (ping/ssh should fail)
# 14. Test VM1 in VR2-VPC (ping/ssh should fail)
"""
self.message("Running test_09_connectivity_between_network_and_vpc_tier")
# 0. Get static routes of Network/VPC
network_ip4routes = []
if test_network:
network_ip4routes = Network.list(
self.apiclient,
id=test_network.id,
listall=True
)[0].ip4routes
else:
self.skipTest("test_network is not created")
vpc_ip4routes = []
if test_vpc:
vpc_ip4routes = VPC.list(
self.apiclient,
id=test_vpc.id,
listall=True
)[0].ip4routes
else:
self.skipTest("test_vpc is not created")
network_router = self.get_router(networkid=test_network.id)
vpc_router = self.get_router(vpcid=test_vpc.id)
# Test VM1 in VR1-Network (wait until ping works)
self.verifyPingFromRouter(network_router, test_network_vm, retries=MAX_RETRIES)
# Test VM2 in VR2-VPC (wait until ping works)
self.verifyPingFromRouter(vpc_router, test_vpc_vm, retries=MAX_RETRIES)
# 1. Add static routes in VRs manually
if not network_router or not vpc_router:
self.skipTest("network_router (%s) or vpc_router (%s) does not exist" % (network_router, vpc_router))
for ip4route in network_ip4routes:
self.run_command_in_router(vpc_router, "ip route add %s via %s" % (ip4route.subnet, ip4route.gateway))
for ip4route in vpc_ip4routes:
self.run_command_in_router(network_router, "ip route add %s via %s" % (ip4route.subnet, ip4route.gateway))
# 2. Test VM2 in VR1-Network (ping/ssh should fail)
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=False)
# 3. Test VM1 in VR2-VPC (ping/ssh should fail)
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
vpc_router_rules = [{"chain": "FORWARD",
"rule": "ip daddr %s jump eth2_ingress_policy" % test_vpc_tier.cidr},
{"chain": "FORWARD",
"rule": "ip saddr %s jump eth2_egress_policy" % test_vpc_tier.cidr}]
vpc_acl_rules = []
# 4. Create Ingress rules in Network ACL for VPC
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = test_network.cidr
rule["protocol"] = "icmp"
rule["icmptype"] = -1
rule["icmpcode"] = -1
vpc_acl_rules.append(self.createNetworkAclRule(rule))
vpc_router_rules.append({"chain": "eth2_ingress_policy",
"rule": "ip saddr %s icmp type %s accept" % (test_network.cidr, ICMPv4_ALL_TYPES)})
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = test_network.cidr
rule["protocol"] = "tcp"
rule["startport"] = 22
rule["endport"] = 22
vpc_acl_rules.append(self.createNetworkAclRule(rule))
vpc_router_rules.append({"chain": "eth2_ingress_policy",
"rule": "ip saddr %s tcp dport 22 accept" % test_network.cidr})
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = network_router.publicip + "/32"
rule["protocol"] = "icmp"
rule["icmptype"] = -1
rule["icmpcode"] = -1
vpc_acl_rules.append(self.createNetworkAclRule(rule))
vpc_router_rules.append({"chain": "eth2_ingress_policy",
"rule": "ip saddr %s icmp type %s accept" % (network_router.publicip, ICMPv4_ALL_TYPES)})
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
# 5. Create Egress rules in Network ACL for VPC
rule = {}
rule["traffictype"] = "Egress"
rule["protocol"] = "icmp"
rule["icmptype"] = -1
rule["icmpcode"] = -1
vpc_acl_rules.append(self.createNetworkAclRule(rule))
vpc_router_rules.append({"chain": "eth2_egress_policy",
"rule": "ip daddr 0.0.0.0/0 icmp type %s accept" % ICMPv4_ALL_TYPES})
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
# 6. Test VM2 in VR1-Network (ping/ssh should succeed)
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=True)
# 7. Test VM1 in VR2-VPC (ping/ssh should fail)
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
network_router_rules = [{"chain": "FORWARD",
"rule": "ip daddr %s jump fw_chain_ingress" % test_network.cidr},
{"chain": "FORWARD",
"rule": "ip saddr %s jump fw_chain_egress" % test_network.cidr}]
network_routing_firewall_rules = []
# 8. Create IPv4 firewalls for Isolated network
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = test_vpc.cidr
rule["protocol"] = "icmp"
rule["icmptype"] = -1
rule["icmpcode"] = -1
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
network_router_rules.append({"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (test_vpc.cidr, ICMPv4_ALL_TYPES)})
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = test_vpc.cidr
rule["protocol"] = "tcp"
rule["startport"] = 22
rule["endport"] = 22
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
network_router_rules.append({"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 tcp dport 22 accept" % test_vpc.cidr})
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = vpc_router.publicip + "/32"
rule["protocol"] = "icmp"
rule["icmptype"] = -1
rule["icmpcode"] = -1
network_routing_firewall_rules.append(self.createIpv4RoutingFirewallRule(rule))
network_router_rules.append({"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (vpc_router.publicip, ICMPv4_ALL_TYPES)})
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
# 9. Test VM2 in VR1-Network (ping/ssh should succeed)
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=True)
# 10. Test VM1 in VR2-VPC (ping/ssh should succeed)
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=True)
# 11. Delete Network ACL rules for VPC
for rule in vpc_acl_rules:
rule.delete(self.apiclient)
vpc_router_rules[2] = {"chain": "eth2_ingress_policy",
"rule": "ip saddr %s icmp type %s accept" % (test_network.cidr, ICMPv4_ALL_TYPES),
"exists": False}
vpc_router_rules[3] = {"chain": "eth2_ingress_policy",
"rule": "ip saddr %s tcp dport 22 accept" % test_network.cidr,
"exists": False}
vpc_router_rules[4] = {"chain": "eth2_egress_policy",
"rule": "ip daddr 0.0.0.0/0 icmp type %s accept" % ICMPv4_ALL_TYPES,
"exists": False}
vpc_router_rules[5] = {"chain": "eth2_ingress_policy",
"rule": "ip saddr %s icmp type %s accept" % (network_router.publicip, ICMPv4_ALL_TYPES),
"exists": False}
self.verifyNftablesRulesInRouter(vpc_router, vpc_router_rules)
# 12. Delete IPv4 firewall rules for Network
for rule in network_routing_firewall_rules:
rule.delete(self.apiclient)
network_router_rules[2] = {"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (test_vpc.cidr, ICMPv4_ALL_TYPES),
"exists": False}
network_router_rules[3] = {"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 tcp dport 22 accept" % test_vpc.cidr,
"exists": False}
network_router_rules[4] = {"chain": "fw_chain_ingress",
"rule": "ip saddr %s ip daddr 0.0.0.0/0 icmp type %s accept" % (vpc_router.publicip, ICMPv4_ALL_TYPES),
"exists": False}
self.verifyNftablesRulesInRouter(network_router, network_router_rules)
# 13. Test VM2 in VR1-Network (ping/ssh should fail)
self.verifyPingFromRouter(network_router, test_vpc_vm, expected=False)
# 14. Test VM1 in VR2-VPC (ping/ssh should fail)
self.verifyPingFromRouter(vpc_router, test_network_vm, expected=False)
@attr(tags=['advanced'], required_hardware=False)
def test_10_bgp_peers(self):
""" Test for BGP peers"""
"""
# 1. Create bgppeer
# 2. List bgppeer
# 3. Update bgppeer
# 4. dedicate bgppeer to domain
# 5. released dedicated bgppeer
# 6. dedicate bgppeer to sub-domain/account
# 7. released dedicated bgppeer
# 8. delete bgppeer
"""
self.message("Running test_10_bgp_peers")
# 1. Create bgp peer
bgppeer_1 = BgpPeer.create(
self.apiclient,
zoneid=self.zone.id,
asnumber=ASN_1,
ipaddress=IP4_ADDR_1
)
self.cleanup.append(bgppeer_1)
# 2. List bgp peer
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list),
True,
"List bgppeers for zone should return a valid list"
)
self.assertEqual(
len(bgppeers) == 1,
True,
"The number of bgp peers (%s) should be equal to 1" % (len(bgppeers))
)
self.assertEqual(
bgppeers[0].asnumber == ASN_1 and bgppeers[0].ipaddress == IP4_ADDR_1,
True,
"The asnumber of bgp peer (%s) should be equal to %s, the ip address (%s) should be %s"
% (bgppeers[0].asnumber, ASN_1, bgppeers[0].ipaddress, IP4_ADDR_1)
)
# 3. Update bgp peer
bgppeer_1.update(
self.apiclient,
asnumber=ASN_2,
ipaddress=IP4_ADDR_2
)
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list) and len(bgppeers) == 1
and bgppeers[0].asnumber == ASN_2 and bgppeers[0].ipaddress == IP4_ADDR_2,
True,
"The asnumber of bgp peer (%s) should be equal to %s, the ip address (%s) should be %s"
% (bgppeers[0].asnumber, ASN_2, bgppeers[0].ipaddress, IP4_ADDR_2)
)
# 4. dedicate bgp peer to domain
BgpPeer.dedicate(
self.apiclient,
id=bgppeer_1.id,
domainid=self.domain.id
)
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list) and len(bgppeers) == 1 and bgppeers[0].domainid == self.domain.id,
True,
"The bgppeer should be dedicated to domain %s" % self.domain.id
)
# 5. released dedicated bgp peer
bgppeer_1.release(
self.apiclient
)
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list) and len(bgppeers) == 1 and not bgppeers[0].domainid,
True,
"The bgp peer should not be dedicated to domain %s" % self.domain.id
)
# 6. dedicate bgp peer to sub-domain/account
BgpPeer.dedicate(
self.apiclient,
id=bgppeer_1.id,
domainid=self.sub_domain.id,
account=self.regular_user.name
)
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list) and len(bgppeers) == 1
and bgppeers[0].domainid == self.sub_domain.id and bgppeers[0].account == self.regular_user.name,
True,
"The bgp peer should be dedicated to account %s" % self.regular_user.name
)
# 7. released dedicated bgp peer
bgppeer_1.release(
self.apiclient
)
bgppeers = BgpPeer.list(
self.apiclient,
id=bgppeer_1.id
)
self.assertEqual(
isinstance(bgppeers, list) and len(bgppeers) == 1 and not bgppeers[0].domainid,
True,
"The bgppeer should not be dedicated to account %s" % self.regular_user.name
)
# 8. delete bgp peer
bgppeer_1.delete(
self.apiclient
)
self.cleanup.remove(bgppeer_1)
@attr(tags=['advanced'], required_hardware=False)
def test_11_isolated_network_with_dynamic_routed_mode(self):
""" Test for Isolated Network with Dynamic Routed mode"""
"""
# 1. Create Isolated network with bgp_peer_1
# 2. Create VM in the network
# 3. Verify frr.conf in network VR
# 4. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
# 5. Verify frr.conf in network VR
# 6. Reboot VR
# 7. Verify frr.conf in network VR
# 8. Update network BGP peers (to bgppeer_2)
# 9. Verify frr.conf in network VR
# 10. Update network BGP peers (to null)
# 11. Verify frr.conf in network VR
"""
self.message("Running test_11_isolated_network_with_dynamic_routed_mode")
# 1. Create bgp peers
bgppeer_1 = BgpPeer.create(
self.apiclient,
zoneid=self.zone.id,
asnumber=ASN_1,
ipaddress=IP4_ADDR_1,
password=PASSWORD_1
)
self.cleanup.append(bgppeer_1)
# 1. Create Isolated network with Dynamic routing
test_network_dynamic = Network.create(
self.apiclient,
self.services["network"],
networkofferingid=self.network_offering_dynamic.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
gateway=NETWORK_CIDR_PREFIX_DYNAMIC + ".1",
netmask="255.255.255.0",
bgppeerids=bgppeer_1.id
)
self.cleanup.append(test_network_dynamic)
# 2. Create VM in the network
test_network_dynamic_vm = VirtualMachine.create(
self.regular_user_apiclient,
self.services["virtual_machine"],
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
networkids=test_network_dynamic.id,
serviceofferingid=self.service_offering.id,
templateid=self.template.id)
self.cleanup.append(test_network_dynamic_vm)
network_router = self.get_router(networkid=test_network_dynamic.id)
# 3. Verify frr.conf in network VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "network %s" % test_network_dynamic.cidr,
"exists": True}]
self.verifyFrrConf(network_router, frr_configs)
# 4. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
bgppeer_2 = BgpPeer.create(
self.apiclient,
zoneid=self.zone.id,
asnumber=ASN_2,
ipaddress=IP4_ADDR_2,
password=PASSWORD_2
)
self.cleanup.append(bgppeer_2)
test_network_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[bgppeer_1.id, bgppeer_2.id]
)
# 5. Verify frr.conf in network VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_network_dynamic.cidr,
"exists": True}]
self.verifyFrrConf(network_router, frr_configs)
# 6. Reboot VR
self.rebootRouter(network_router)
# 7. Verify frr.conf in network VR
network_router = self.get_router(networkid=test_network_dynamic.id)
self.verifyFrrConf(network_router, frr_configs)
# 8. Update network BGP peers (to bgppeer_2)
test_network_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[bgppeer_2.id]
)
# 9. Verify frr.conf in network VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": False},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": False},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_network_dynamic.cidr,
"exists": True}]
self.verifyFrrConf(network_router, frr_configs)
# 10. Update network BGP peers (to null)
test_network_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[]
)
# 11. Verify frr.conf in network VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_network_dynamic.cidr,
"exists": True}]
self.verifyFrrConf(network_router, frr_configs)
@attr(tags=['advanced'], required_hardware=False)
def test_12_vpc_and_tier_with_dynamic_routed_mode(self):
""" Test for VPC/tier with Dynamic Routed mode"""
"""
# 1. Create bgp peers
# 2. Create VPC
# 3. Create Network ACL (egress = Deny, ingress = Deny)
# 4. Create VPC tier with Network ACL in the VPC
# 5. Create VM in the VPC tier
# 6. Verify frr.conf in VPC VR
# 7. Update network BGP peers (to bgp_peer_1 and bgp_peer_2)
# 8. Verify frr.conf in VPC VR
# 9. Create VPC tier-2 with Network ACL in the VPC
# 10. Create VM-2 in the VPC tier-2
# 11. Verify frr.conf in VPC VR
# 12. Reboot VPC VR
# 13. Verify frr.conf in VPC VR
# 14. Update network BGP peers (to bgppeer_2)
# 15. Verify frr.conf in VPC VR
# 16. Update network BGP peers (to null)
# 17. Verify frr.conf in VPC VR
"""
self.message("Running test_12_vpc_and_tier_with_dynamic_routed_mode")
# 1. Create bgp peers
bgppeer_1 = BgpPeer.create(
self.apiclient,
zoneid=self.zone.id,
asnumber=ASN_1,
ipaddress=IP4_ADDR_1,
password=PASSWORD_1
)
self.cleanup.append(bgppeer_1)
# 2.1 VPC offering for static routing
vpc_offering_dynamic = VpcOffering.create(
self.apiclient,
VPC_OFFERING_DYNAMIC
)
self.cleanup.append(vpc_offering_dynamic)
vpc_offering_dynamic.update(self.apiclient, state='Enabled')
# 2.2 Create VPC
self.services["vpc"]["cidr"] = VPC_CIDR_PREFIX + ".8.0/22"
test_vpc_dynamic = VPC.create(self.apiclient,
self.services["vpc"],
vpcofferingid=vpc_offering_dynamic.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
account=self.regular_user.name,
start=False,
bgppeerids=bgppeer_1.id
)
self.cleanup.append(test_vpc_dynamic)
# 3. Create Network ACL (egress = Deny, ingress = Deny)
test_network_acl_dynamic = NetworkACLList.create(self.apiclient,
services={},
name="test-network-acl-dynamic",
description="test-network-acl-dynamic",
vpcid=test_vpc_dynamic.id
)
# 4. Create VPC tier with Network ACL in the VPC
self.services["network"]["name"] = "test_vpc_tier_dynamic_1"
test_vpc_tier_dynamic_1 = Network.create(self.regular_user_apiclient,
self.services["network"],
networkofferingid=self.vpc_network_offering_dynamic.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
vpcid=test_vpc_dynamic.id,
gateway=VPC_CIDR_PREFIX + ".8.1",
netmask="255.255.255.0",
aclid=test_network_acl_dynamic.id
)
self.cleanup.append(test_vpc_tier_dynamic_1)
# 5. Create VM in the VPC tier
test_vpc_vm_dynamic_1 = VirtualMachine.create(
self.regular_user_apiclient,
self.services["virtual_machine"],
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
networkids=test_vpc_tier_dynamic_1.id,
serviceofferingid=self.service_offering.id,
templateid=self.template.id)
self.cleanup.append(test_vpc_vm_dynamic_1)
vpc_router = self.get_router(vpcid=test_vpc_dynamic.id)
# 6. Verify frr.conf in VPC VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
"exists": True}]
self.verifyFrrConf(vpc_router, frr_configs)
# 7. Update VPC BGP peers (to bgp_peer_1 and bgp_peer_2)
bgppeer_2 = BgpPeer.create(
self.apiclient,
zoneid=self.zone.id,
asnumber=ASN_2,
ipaddress=IP4_ADDR_2,
password=PASSWORD_2
)
self.cleanup.append(bgppeer_2)
test_vpc_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[bgppeer_1.id, bgppeer_2.id]
)
# 8. Verify frr.conf in VPC VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
"exists": True}]
self.verifyFrrConf(vpc_router, frr_configs)
# 9. Create VPC tier-2 with Network ACL in the VPC
self.services["network"]["name"] = "test_vpc_tier_dynamic_2"
test_vpc_tier_dynamic_2 = Network.create(self.regular_user_apiclient,
self.services["network"],
networkofferingid=self.vpc_network_offering_dynamic.id,
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
vpcid=test_vpc_dynamic.id,
gateway=VPC_CIDR_PREFIX + ".9.1",
netmask="255.255.255.0",
aclid=test_network_acl_dynamic.id
)
self.cleanup.append(test_vpc_tier_dynamic_2)
# 10. Create VM-2 in the VPC tier-2
test_vpc_vm_dynamic_2 = VirtualMachine.create(
self.regular_user_apiclient,
self.services["virtual_machine"],
zoneid=self.zone.id,
domainid=self.sub_domain.id,
accountid=self.regular_user.name,
networkids=test_vpc_tier_dynamic_2.id,
serviceofferingid=self.service_offering.id,
templateid=self.template.id)
self.cleanup.append(test_vpc_vm_dynamic_2)
# 11. Verify frr.conf in VPC VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
"exists": True}]
self.verifyFrrConf(vpc_router, frr_configs)
# 12. Reboot VPC VR
self.rebootRouter(vpc_router)
# 13. Verify frr.conf in VPC VR
vpc_router = self.get_router(vpcid=test_vpc_dynamic.id)
self.verifyFrrConf(vpc_router, frr_configs)
# 14. Update VPC BGP peers (to bgppeer_2)
test_vpc_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[bgppeer_2.id]
)
# 15. Verify frr.conf in VPC VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": False},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": False},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
"exists": True}]
self.verifyFrrConf(vpc_router, frr_configs)
# 16. Update VPC BGP peers (to null)
test_vpc_dynamic.changeBgpPeers(
self.apiclient,
bgppeerids=[]
)
# 17. Verify frr.conf in VPC VR
frr_configs = [{"config": "neighbor %s remote-as %s" % (bgppeer_1.ipaddress, bgppeer_1.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_1.ipaddress, PASSWORD_1),
"exists": True},
{"config": "neighbor %s remote-as %s" % (bgppeer_2.ipaddress, bgppeer_2.asnumber),
"exists": True},
{"config": "neighbor %s password %s" % (bgppeer_2.ipaddress, PASSWORD_2),
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_1.cidr,
"exists": True},
{"config": "network %s" % test_vpc_tier_dynamic_2.cidr,
"exists": True}]
self.verifyFrrConf(vpc_router, frr_configs)
@attr(tags=['advanced'], required_hardware=False)
def test_13_asn_ranges(self):
""" Test for ASN ranges"""
"""
# 1. Create an ASN range without overlap
# 2. List ASN ranges by zoneid
# 3. List ASN numbers by ASN range id
# 4. Create an ASN range with overlap, it should fail
# 5. Delete ASN range
"""
self.message("Running test_13_asn_ranges")
# 1. Create an ASN range without overlap
asnrange_2 = ASNRange.create(
self.apiclient,
zoneid=self.zone.id,
startasn=END_ASN+100,
endasn=END_ASN+200
)
self.cleanup.append(asnrange_2)
# 2. List ASN ranges by zoneid
ranges = ASNRange.list(
self.apiclient,
zoneid = self.zone.id
)
self.assertEqual(
isinstance(ranges, list),
True,
"List ASN ranges by zoneid should return a valid list"
)
self.assertEqual(
len(ranges) >= 1,
True,
"The number of ASN ranges (%s) should be at least 1" % (len(ranges))
)
asnrange_2_new = None
for range in ranges:
if range.startasn == asnrange_2.startasn:
asnrange_2_new = range
break
if asnrange_2_new:
self.assertEqual(
asnrange_2_new.endasn == asnrange_2.endasn,
True,
"The end ASN of ASN range (%s-%s) should be equal to %s" % (asnrange_2_new.startasn, asnrange_2_new.endasn, asnrange_2.endasn)
)
else:
self.fail("Unable to find ASN range (%s-%s)" % (asnrange_2.startasn, asnrange_2.endasn))
# 3. List ASN numbers by ASN range id
asnumbers = ASNRange.listAsNumbers(
self.apiclient,
zoneid = self.zone.id,
asnrangeid = asnrange_2.id
)
self.assertEqual(
isinstance(asnumbers, list),
True,
"List AS numbers should return a valid list"
)
self.assertEqual(
len(asnumbers) == asnrange_2.endasn - asnrange_2.startasn + 1,
True,
"The number of asnumbers (%s) should be equal to %s" % (len(asnumbers), (asnrange_2.endasn - asnrange_2.startasn + 1))
)
# 4. Create an ASN range with overlap, it should fail
try:
asnrange_3 = ASNRange.create(
self.apiclient,
zoneid=self.zone.id,
startasn=END_ASN+150,
endasn=END_ASN+250
)
self.cleanup.append(asnrange_3)
self.fail("Succeeded to create ASN range (%s-%s) but it should fail" % (asnrange_3.startasn, asnrange_3.endasn))
except Exception as e:
self.message("Failed to create ASN range but it is expected")
# 5. Delete ASN range
asnrange_2.delete(
self.apiclient
)
self.cleanup.remove(asnrange_2)