blob: efec43add7ca3a90c8a9a6bd2fb22d58994650b5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" BVT test for IPv6 VPC"""
#Import Local Modules
from marvin.codes import FAILED
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.cloudstackAPI import (createGuestNetworkIpv6Prefix,
listGuestNetworkIpv6Prefixes,
deleteGuestNetworkIpv6Prefix)
from marvin.lib.utils import (isAlmostEqual,
random_gen,
get_process_status,
get_host_credentials)
from marvin.lib.base import (Configurations,
Domain,
NetworkOffering,
VpcOffering,
Account,
PublicIpRange,
Network,
VPC,
Router,
ServiceOffering,
VirtualMachine,
NIC,
Host,
NetworkACLList,
NetworkACL)
from marvin.lib.common import (get_domain,
get_zone,
get_test_template,
get_template)
from marvin.sshClient import SshClient
from marvin.cloudstackException import CloudstackAPIException
from marvin.lib.decoratorGenerators import skipTestIf
from nose.plugins.attrib import attr
from ipaddress import IPv6Network
from random import getrandbits, choice, randint
import time
import logging
import threading
ipv6_offering_config_name = "ipv6.offering.enabled"
ULA_BASE = IPv6Network("fd00::/8")
PREFIX_OPTIONS = [i for i in range(48, 65, 4)]
ACL_TABLE = "ip6_acl"
ACL_CHAINS_SUFFIX = {
"Ingress": "_ingress_policy",
"Egress": "_egress_policy"
}
CIDR_IPV6_ANY = "::/0"
ICMPV6_TYPE = {
1: "destination-unreachable",
2: "packet-too-big",
3: "time-exceeded",
4: "parameter-problem",
128: "echo-request",
129: "echo-reply",
130: "mld-listener-query",
131: "mld-listener-report",
132: "mld-listener-done",
133: "nd-router-solicit",
134: "nd-router-advert",
135: "nd-neighbor-solicit",
136: "nd-neighbor-advert",
137: "nd-redirect",
138: "router-renumbering",
141: "ind-neighbor-solicit",
142: "ind-neighbor-advert",
143: "mld2-listener-report"
}
ICMPV6_CODE_TYPE = {
0: "no-route",
1: "admin-prohibited",
3: "addr-unreachable",
4: "port-unreachable",
5: "policy-fail",
6: "reject-route"
}
ICMPV6_TYPE_ANY = "{ destination-unreachable, packet-too-big, time-exceeded, parameter-problem, echo-request, echo-reply, mld-listener-query, mld-listener-report, mld-listener-done, nd-router-solicit, nd-router-advert, nd-neighbor-solicit, nd-neighbor-advert, nd-redirect, router-renumbering }"
TCP_UDP_PORT_ANY = "{ 0-65535 }"
VPC_ROUTER_PUBLIC_NIC = "eth1"
VPC_ROUTER_GUEST_NIC = "eth2"
VPC_DATA = {
"cidr": "10.1.0.0/22",
"tier1_gateway": "10.1.1.1",
"tier2_gateway": "10.1.2.1",
"tier_netmask": "255.255.255.0"
}
ROUTE_TEST_VPC_DATA = {
"cidr": "10.2.0.0/22",
"tier1_gateway": "10.2.1.1",
"tier_netmask": "255.255.255.0"
}
SLEEP_BEFORE_VR_CHANGES = 90
PING_RETRIES = 5
PING_SLEEP = 20
class TestIpv6Vpc(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestIpv6Vpc, cls).getClsTestClient()
cls.services = testClient.getParsedTestDataConfig()
cls.apiclient = testClient.getApiClient()
cls.dbclient = testClient.getDbConnection()
cls.test_ipv6_guestprefix = None
cls.initial_ipv6_offering_enabled = None
cls._cleanup = []
cls.routerDetailsMap = {}
cls.vpcAllowAllAclDetailsMap = {}
cls.logger = logging.getLogger('TestIpv6Vpc')
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.services['mode'] = cls.zone.networktype
cls.ipv6NotSupported = False
ipv6_guestprefix = cls.getGuestIpv6Prefix()
if ipv6_guestprefix == None:
cls.ipv6NotSupported = True
if cls.ipv6NotSupported == False:
ipv6_publiciprange = cls.getPublicIpv6Range()
if ipv6_publiciprange == None:
cls.ipv6NotSupported = True
if cls.ipv6NotSupported == False:
cls.initial_ipv6_offering_enabled = Configurations.list(
cls.apiclient,
name=ipv6_offering_config_name)[0].value
Configurations.update(cls.apiclient,
ipv6_offering_config_name,
"true")
cls.domain = get_domain(cls.apiclient)
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
admin=False,
domainid=cls.domain.id
)
cls._cleanup.append(cls.account)
cls.hypervisor = testClient.getHypervisorInfo()
cls.template = get_test_template(
cls.apiclient,
cls.zone.id,
cls.hypervisor)
else:
cls.debug("IPv6 is not supported, skipping tests!")
return
@classmethod
def tearDownClass(cls):
if cls.initial_ipv6_offering_enabled != None:
Configurations.update(cls.apiclient,
ipv6_offering_config_name,
cls.initial_ipv6_offering_enabled)
try:
super(TestIpv6Vpc, cls).tearDownClass()
finally:
if cls.test_ipv6_guestprefix != None:
cmd = deleteGuestNetworkIpv6Prefix.deleteGuestNetworkIpv6PrefixCmd()
cmd.id = cls.test_ipv6_guestprefix.id
cls.apiclient.deleteGuestNetworkIpv6Prefix(cmd)
@classmethod
def getGuestIpv6Prefix(cls):
cmd = listGuestNetworkIpv6Prefixes.listGuestNetworkIpv6PrefixesCmd()
cmd.zoneid = cls.zone.id
ipv6_prefixes_response = cls.apiclient.listGuestNetworkIpv6Prefixes(cmd)
if isinstance(ipv6_prefixes_response, list) == True and len(ipv6_prefixes_response) > 0:
return ipv6_prefixes_response[0]
ipv6_guestprefix_service = cls.services["guestip6prefix"]
cmd = createGuestNetworkIpv6Prefix.createGuestNetworkIpv6PrefixCmd()
cmd.zoneid = cls.zone.id
cmd.prefix = ipv6_guestprefix_service["prefix"]
ipv6_guestprefix = cls.apiclient.createGuestNetworkIpv6Prefix(cmd)
cls.test_ipv6_guestprefix = ipv6_guestprefix
return ipv6_guestprefix
@classmethod
def getPublicIpv6Range(cls):
list_public_ip_range_response = PublicIpRange.list(
cls.apiclient,
zoneid=cls.zone.id
)
ipv4_range_vlan = None
if isinstance(list_public_ip_range_response, list) == True and len(list_public_ip_range_response) > 0:
for ip_range in list_public_ip_range_response:
if ip_range.ip6cidr != None and ip_range.ip6gateway != None:
return ip_range
if ip_range.netmask != None and ip_range.gateway != None:
vlan = ip_range.vlan
if ipv4_range_vlan == None and vlan.startswith("vlan://"):
vlan = vlan.replace("vlan://", "")
if vlan == "untagged":
ipv4_range_vlan = None
else:
ipv4_range_vlan = int(vlan)
ipv6_publiciprange_service = cls.services["publicip6range"]
ipv6_publiciprange_service["zoneid"] = cls.zone.id
ipv6_publiciprange_service["vlan"] = ipv4_range_vlan
ipv6_publiciprange = PublicIpRange.create(
cls.apiclient,
ipv6_publiciprange_service
)
cls._cleanup.append(ipv6_publiciprange)
return ipv6_publiciprange
def setUp(self):
self.services = self.testClient.getParsedTestDataConfig()
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.thread = None
self.cleanup = []
return
def tearDown(self):
try:
if self.thread and self.thread.is_alive():
self.thread.join(5*60)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
finally:
super(TestIpv6Vpc, self).tearDown()
return
def getRandomIpv6Cidr(self):
prefix_length = choice(PREFIX_OPTIONS)
random_suffix = getrandbits(40) << (128-prefix_length)
base_address = ULA_BASE.network_address + random_suffix
return str(IPv6Network((base_address, prefix_length)))
def createTinyServiceOffering(self):
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offerings"]["big"],
)
self.cleanup.append(self.service_offering)
def createVpcOfferingInternal(self, is_redundant, is_ipv6):
off_service = self.services["vpc_offering"]
if is_redundant:
off_service["serviceCapabilityList"] = {
"SourceNat": {
"RedundantRouter": 'true'
},
}
if is_ipv6:
off_service["internetprotocol"] = "dualstack"
vpc_offering = VpcOffering.create(
self.apiclient,
off_service
)
self.cleanup.append(vpc_offering)
vpc_offering.update(self.apiclient, state='Enabled')
return vpc_offering
def createIpv4VpcOffering(self, is_redundant=False):
self.vpc_offering = self.createVpcOfferingInternal(is_redundant, False)
def createIpv6VpcOffering(self, is_redundant=False):
self.vpc_offering = self.createVpcOfferingInternal(is_redundant, True)
def createIpv6VpcOfferingForUpdate(self, is_redundant=False):
self.vpc_offering_update = self.createVpcOfferingInternal(is_redundant, True)
def createNetworkTierOfferingInternal(self, is_ipv6, remove_lb=True):
off_service = self.services["nw_offering_isolated_vpc"]
if not remove_lb: # Remove Lb service
if "serviceProviderList" in off_service and "Lb" in off_service["serviceProviderList"].keys():
providers = off_service["serviceProviderList"]
providers.pop("Lb")
off_service["serviceProviderList"] = providers
if "supportedservices" in off_service and "Lb" in off_service["supportedservices"]:
supportedServices = off_service["supportedservices"].split(",")
supportedServices.remove("Lb")
off_service["supportedservices"] = ",".join(supportedServices)
if is_ipv6:
off_service["internetprotocol"] = "dualstack"
network_offering = NetworkOffering.create(
self.apiclient,
off_service,
conservemode=False
)
self.cleanup.append(network_offering)
network_offering.update(self.apiclient, state='Enabled')
return network_offering
def createIpv4NetworkTierOffering(self):
self.network_offering = self.createNetworkTierOfferingInternal(False)
def createIpv6NetworkTierOffering(self, remove_lb=True):
self.network_offering = self.createNetworkTierOfferingInternal(True)
def createIpv6NetworkTierOfferingForUpdate(self):
self.network_offering_update = self.createNetworkTierOfferingInternal(True)
def deployAllowAllVpcInternal(self, cidr):
service = self.services["vpc"]
service["cidr"] = cidr
vpc = VPC.create(
self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_offering.id,
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid
)
self.cleanup.append(vpc)
acl = NetworkACLList.create(
self.apiclient,
services={},
name="allowall",
description="allowall",
vpcid=vpc.id
)
rule ={
"protocol": "all",
"traffictype": "ingress",
}
NetworkACL.create(self.apiclient,
services=rule,
aclid=acl.id
)
rule["traffictype"] = "egress"
NetworkACL.create(self.apiclient,
services=rule,
aclid=acl.id
)
self.vpcAllowAllAclDetailsMap[vpc.id] = acl.id
return vpc
def deployVpc(self):
self.vpc = self.deployAllowAllVpcInternal(VPC_DATA["cidr"])
def deployNetworkTierInternal(self, network_offering_id, vpc_id, tier_gateway, tier_netmask, acl_id=None, tier_name=None):
if not acl_id and vpc_id in self.vpcAllowAllAclDetailsMap:
acl_id = self.vpcAllowAllAclDetailsMap[vpc_id]
service = self.services["ntwk"]
if tier_name:
service["name"] = tier_name
service["displaytext"] = "vpc-%s" % tier_name
network = Network.create(
self.apiclient,
service,
self.account.name,
self.account.domainid,
networkofferingid=network_offering_id,
vpcid=vpc_id,
zoneid=self.zone.id,
gateway=tier_gateway,
netmask=tier_netmask,
aclid=acl_id
)
self.cleanup.append(network)
return network
def deployNetworkTier(self):
self.network = self.deployNetworkTierInternal(
self.network_offering.id,
self.vpc.id,
VPC_DATA["tier1_gateway"],
VPC_DATA["tier_netmask"]
)
def deployNetworkTierVmInternal(self, network):
if self.template == FAILED:
assert False, "get_test_template() failed to return template"
self.services["virtual_machine"]["zoneid"] = self.zone.id
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
networkids=network,
serviceofferingid=self.service_offering.id
)
self.cleanup.append(virtual_machine)
return virtual_machine
def deployNetworkTierVm(self):
self.virtual_machine = self.deployNetworkTierVmInternal(self.network.id)
def checkIpv6Vpc(self):
self.debug("Listing VPC: %s" % (self.vpc.name))
ipv6_vpc = VPC.list(self.apiclient,listall="true",id=self.vpc.id)
self.assertTrue(
isinstance(ipv6_vpc, list),
"Check listVpcs response returns a valid list"
)
self.assertEqual(
len(ipv6_vpc),
1,
"Network not found"
)
ipv6_vpc = ipv6_vpc[0]
self.assertNotEqual(ipv6_vpc.ip6routes,
None,
"IPv6 routes for network is empty")
def checkIpv6NetworkTierBasic(self):
self.debug("Listing network: %s" % (self.network.name))
ipv6_network = Network.list(self.apiclient,listall="true",id=self.network.id)
self.assertTrue(
isinstance(ipv6_network, list),
"Check listNetworks response returns a valid list"
)
self.assertEqual(
len(ipv6_network),
1,
"Network not found"
)
ipv6_network = ipv6_network[0]
self.assertNotEqual(ipv6_network,
None,
"User is not able to retrieve network details %s" % self.network.id)
self.assertNotEqual(ipv6_network.ip6cidr,
None,
"IPv6 CIDR for network is empty")
self.assertNotEqual(ipv6_network.ip6gateway,
None,
"IPv6 gateway for network is empty")
self.assertNotEqual(ipv6_network.ip6routes,
None,
"IPv6 routes for network is empty")
def checkIpv6VpcRoutersBasic(self):
self.debug("Listing routers for VPC: %s" % self.vpc.name)
self.routers = Router.list(
self.apiclient,
vpcid=self.vpc.id,
listall=True
)
self.assertTrue(
isinstance(self.routers, list),
"Check listRouters response returns a valid list"
)
self.assertTrue(
len(self.routers) > 0,
"Router for the network isn't found"
)
for router in self.routers:
self.assertFalse(
router.isredundantrouter == True and router.redundantstate == "FAULT",
"Router for the network is in FAULT state"
)
nics = router.nic
for nic in nics:
if (nic.traffictype == 'Guest' and router.isredundantrouter == False) or nic.traffictype == 'Public':
self.assertNotEqual(nic.ip6address,
None,
"IPv6 address for router %s NIC is empty" % nic.traffictype)
self.assertNotEqual(nic.ip6cidr,
None,
"IPv6 CIDR for router %s NIC is empty" % nic.traffictype)
self.assertNotEqual(nic.ip6gateway,
None,
"IPv6 gateway for router %s NIC is empty" % nic.traffictype)
def getRouterProcessStatus(self, router, cmd):
if router.id not in self.routerDetailsMap or self.routerDetailsMap[router.id] is None:
connect_ip = self.apiclient.connection.mgtSvr
connect_user = self.apiclient.connection.user
connect_passwd = self.apiclient.connection.passwd
hypervisor = self.hypervisor
if self.hypervisor.lower() not in ('vmware', 'hyperv'):
hosts = Host.list(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
connect_ip = host.ipaddress
hypervisor = None
try:
connect_user, connect_passwd= get_host_credentials(
self.config, host.ipaddress)
except KeyError:
self.skipTest(
"Marvin configuration has no host credentials to\
check router services")
details = {}
details['connect_ip'] = connect_ip
details['connect_user'] = connect_user
details['connect_passwd'] = connect_passwd
details['hypervisor'] = hypervisor
self.routerDetailsMap[router.id] = details
result = get_process_status(
self.routerDetailsMap[router.id]['connect_ip'],
22,
self.routerDetailsMap[router.id]['connect_user'],
self.routerDetailsMap[router.id]['connect_passwd'],
router.linklocalip,
cmd,
hypervisor=self.routerDetailsMap[router.id]['hypervisor']
)
self.assertTrue(type(result) == list and len(result) > 0,
"%s on router %s returned invalid result" % (cmd, router.id))
result = '\n'.join(result)
return result
def getVpcRouter(self, vpc, red_state="PRIMARY"):
routers = Router.list(
self.apiclient,
vpcid=vpc.id,
listall=True
)
self.assertTrue(
isinstance(routers, list) and len(routers) > 0,
"No routers found for VPC %s" % vpc.id
)
if len(routers) == 1:
return routers[0]
for router in routers:
if router.redundantstate == red_state:
return router
def getNetworkGateway(self, network):
ipv6_network = Network.list(self.apiclient,listall="true",id=network.id)
self.assertTrue(
isinstance(ipv6_network, list),
"Check listNetworks response returns a valid list"
)
self.assertEqual(
len(ipv6_network),
1,
"Network not found"
)
ipv6_network = ipv6_network[0]
self.assertNotEqual(ipv6_network.ip6gateway,
None,
"IPv6 gateway for network is empty")
return ipv6_network.ip6gateway
def getNetworkRoutes(self, network):
ipv6_network = Network.list(self.apiclient,listall="true",id=network.id)
self.assertTrue(
isinstance(ipv6_network, list),
"Check listNetworks response returns a valid list"
)
self.assertEqual(
len(ipv6_network),
1,
"Network not found"
)
ipv6_network = ipv6_network[0]
self.assertNotEqual(ipv6_network.ip6routes,
None,
"IPv6 routes for network is empty")
return ipv6_network.ip6routes
def isNetworkEgressDefaultPolicyAllow(self, network):
ipv6_network = Network.list(self.apiclient,listall="true",id=network.id)
if len(ipv6_network) == 1:
ipv6_network = ipv6_network[0]
return ipv6_network.egressdefaultpolicy
return False
def checkRouterNicState(self, router, dev, state):
st = "state %s" % state
cmd = "ip link show %s | grep '%s'" % (dev, st)
res = self.getRouterProcessStatus(router, cmd)
self.assertTrue(type(res) == str and len(res) > 0 and st in res,
"%s failed on router %s" % (cmd, router.id))
def checkIpv6VpcPrimaryRouter(self, router, network_ip6gateway):
self.checkRouterNicState(router, VPC_ROUTER_GUEST_NIC, "UP")
guest_gateway_check_cmd = "ip -6 address show %s | grep 'inet6 %s'" % (VPC_ROUTER_GUEST_NIC, network_ip6gateway)
res = self.getRouterProcessStatus(router, guest_gateway_check_cmd)
self.assertTrue(type(res) == str and len(res) > 0 and network_ip6gateway in res,
"%s failed on router %s" % (guest_gateway_check_cmd, router.id))
self.assertFalse("dadfailed" in res,
"dadfailed for IPv6 guest gateway on router %s" % router.id)
self.checkRouterNicState(router, VPC_ROUTER_PUBLIC_NIC, "UP")
public_ipv6 = None
public_ipv6_gateway = None
nics = router.nic
for nic in nics:
if nic.traffictype == 'Public':
public_ipv6 = nic.ip6address
public_ipv6_gateway = nic.ip6gateway
break
self.assertNotEqual(public_ipv6,
None,
"IPv6 address for router Public NIC is empty")
public_ip_check_cmd = "ip -6 address show %s | grep 'inet6 %s'" % (VPC_ROUTER_PUBLIC_NIC, public_ipv6)
res = self.getRouterProcessStatus(router, public_ip_check_cmd)
self.assertTrue(type(res) == str and len(res) > 0 and public_ipv6 in res,
"%s failed on router %s" % (public_ip_check_cmd, router.id))
self.assertFalse("dadfailed" in res,
"dadfailed for public IPv6 on router %s" % router.id)
self.assertNotEqual(public_ipv6_gateway,
None,
"IPv6 gateway for router Public NIC is empty")
default_route_check_cmd = "ip -6 route | grep 'default via %s'" % (public_ipv6_gateway)
res = self.getRouterProcessStatus(router, default_route_check_cmd)
self.assertTrue(type(res) == str and len(res) > 0 and public_ipv6_gateway in res,
"%s failed on router %s" % (default_route_check_cmd, router.id))
def checkIpv6VpcBackupRouter(self, router, network_ip6gateway):
self.checkRouterNicState(router, VPC_ROUTER_GUEST_NIC, "UP")
guest_gateway_check_cmd = "ip -6 address show %s | grep 'inet6 %s'" % ("eth0", network_ip6gateway)
res = self.getRouterProcessStatus(router, guest_gateway_check_cmd)
self.assertFalse(type(res) == str and len(res) > 0 and network_ip6gateway in res,
"%s failed on router %s" % (guest_gateway_check_cmd, router.id))
self.checkRouterNicState(router, VPC_ROUTER_PUBLIC_NIC, "DOWN")
def checkIpv6VpcRoutersInternal(self):
network_ip6gateway = self.getNetworkGateway(self.network)
for router in self.routers:
if router.state != "Running":
continue
if router.isredundantrouter == True and router.redundantstate == 'BACKUP':
self.checkIpv6VpcBackupRouter(router, network_ip6gateway)
continue
self.checkIpv6VpcPrimaryRouter(router, network_ip6gateway)
def checkIpv6NetworkTierVm(self):
self.debug("Listing NICS for VM %s in network tier: %s" % (self.virtual_machine.name, self.network.name))
nics = NIC.list(
self.apiclient,
virtualmachineid=self.virtual_machine.id,
networkid=self.network.id
)
self.assertEqual(
len(nics),
1,
"VM NIC for the network tier isn't found"
)
nic = nics[0]
self.assertNotEqual(nic.ip6address,
None,
"IPv6 address for VM %s NIC is empty" % nic.traffictype)
self.virtual_machine_ipv6_address = nic.ip6address
self.assertNotEqual(nic.ip6cidr,
None,
"IPv6 CIDR for VM %s NIC is empty" % nic.traffictype)
self.assertNotEqual(nic.ip6gateway,
None,
"IPv6 gateway for VM %s NIC is empty" % nic.traffictype)
def restartVpcWithCleanup(self):
self.vpc.restart(self.apiclient, cleanup=True)
time.sleep(SLEEP_BEFORE_VR_CHANGES)
def updateNetworkTierWithOffering(self):
self.network.update(self.apiclient, networkofferingid=self.network_offering_update.id)
time.sleep(SLEEP_BEFORE_VR_CHANGES)
def deployRoutingTestResources(self):
self.routing_test_vpc = self.deployAllowAllVpcInternal(ROUTE_TEST_VPC_DATA["cidr"])
self.routing_test_network_offering = self.createNetworkTierOfferingInternal(True)
self.routing_test_network = self.deployNetworkTierInternal(
self.routing_test_network_offering.id,
self.routing_test_vpc.id,
ROUTE_TEST_VPC_DATA["tier1_gateway"],
ROUTE_TEST_VPC_DATA["tier_netmask"]
)
self.services["virtual_machine"]["zoneid"] = self.zone.id
self.routing_test_vm = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
networkids=[self.routing_test_network.id],
serviceofferingid=self.service_offering.id,
mode="advanced",
vpcid=self.routing_test_vpc.id
)
self.cleanup.append(self.routing_test_vm)
def prepareRoutingTestResourcesInBackground(self):
self.thread = threading.Thread(target=self.deployRoutingTestResources, args=())
self.thread.daemon = True
self.thread.start()
def checkVpcRouting(self):
if not self.thread:
self.deployRoutingTestResources()
else:
self.thread.join(5*60)
self.assertFalse(not self.routing_test_vpc or not self.routing_test_network or not self.routing_test_vm,
"Routing resources failure")
test_vpc_router = self.getVpcRouter(self.routing_test_vpc)
routes = self.getNetworkRoutes(self.network)
self.logger.debug("Adding vpc routes in routing_test_vpc %s" % routes)
for route in routes:
add_route_cmd = "ip -6 route add %s via %s" % (route.subnet, route.gateway)
self.getRouterProcessStatus(test_vpc_router, add_route_cmd)
vpc_router = self.getVpcRouter(self.vpc)
routes = self.getNetworkRoutes(self.routing_test_network)
self.logger.debug("Adding routing_test_vpc routes in vpc %s" % routes)
for route in routes:
add_route_cmd = "ip -6 route add %s via %s" % (route.subnet, route.gateway)
self.getRouterProcessStatus(vpc_router, add_route_cmd)
ping_cmd = "ping6 -c 4 %s" % self.virtual_machine_ipv6_address
count = 0
while count < PING_RETRIES:
count = count + 1
res = self.getRouterProcessStatus(test_vpc_router, ping_cmd)
if " 0% packet loss" in res:
break
time.sleep(PING_SLEEP)
self.assertTrue(" 0% packet loss" in res,
"Ping from router %s of VPC %s to VM %s of VPC %s is unsuccessful" % (test_vpc_router.id, self.routing_test_vpc.id, self.virtual_machine.id, self.vpc.id))
ssh = self.routing_test_vm.get_ssh_client(retries=5)
count = 0
while count < PING_RETRIES:
count = count + 1
res = ssh.execute(ping_cmd)
if type(res) == list and len(res) > 0 and " 0% packet loss" in '\n'.join(res):
break
time.sleep(PING_SLEEP)
self.assertTrue(type(res) == list and len(res) > 0,
"%s on VM %s returned invalid result" % (ping_cmd, self.routing_test_vm.id))
self.logger.debug(res)
res = '\n'.join(res)
self.assertTrue(" 0% packet loss" in res,
"Ping from VM %s of VPC %s to VM %s of VPC %s is unsuccessful" % (self.routing_test_vm.id, self.routing_test_vpc.id, self.virtual_machine.id, self.vpc.id))
def createNetworkAclRule(self, rule, aclid):
return NetworkACL.create(self.apiclient,
services=rule,
aclid=aclid
)
def verifyAclRulesInRouter(self, nic, rules, router):
for rule in rules:
acl_chain = nic + ACL_CHAINS_SUFFIX[rule["traffictype"]]
routerCmd = "nft list chain ip6 %s %s" % (ACL_TABLE, acl_chain)
res = self.getRouterProcessStatus(router, routerCmd)
parsed_rule_new = rule["parsedrule"].replace("{ ", "").replace(" }", "")
self.assertTrue(rule["parsedrule"] in res or parsed_rule_new in res,
"Listing firewall rule with nft list chain failure for rule: '%s' is not in '%s'" % (rule["parsedrule"], res))
def checkIpv6AclRule(self):
router = self.getVpcRouter(self.vpc)
tier1_acl = NetworkACLList.create(
self.apiclient,
services={},
name="tier1_acl",
description="tier1_acl",
vpcid=self.vpc.id
)
rules = []
# Ingress - ip6 saddr SOURCE_CIDR tcp dport { START_PORT-END_PORT } accept
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = self.getRandomIpv6Cidr()
rule["protocol"] = "tcp"
rule["startport"] = randint(3000, 5000)
rule["endport"] = rule["startport"] + randint(1, 8)
parsedrule = "ip6 saddr %s %s dport { %d-%d } accept" % (rule["cidrlist"], rule["protocol"], rule["startport"], rule["endport"])
rules.append({"traffictype": rule["traffictype"], "parsedrule": parsedrule})
self.createNetworkAclRule(rule, tier1_acl.id)
# Egress - ip6 daddr DEST_CIDR icmpv6 type TYPE code CODE accept
rule = {}
rule["traffictype"] = "Egress"
rule["cidrlist"] = self.getRandomIpv6Cidr()
rule["protocol"] = "icmp"
rule["icmptype"] = choice(list(ICMPV6_TYPE.keys()))
rule["icmpcode"] = choice(list(ICMPV6_CODE_TYPE.keys()))
parsedrule = "ip6 daddr %s %sv6 type %s %sv6 code %s accept" % (rule["cidrlist"], rule["protocol"], ICMPV6_TYPE[rule["icmptype"]], rule["protocol"], ICMPV6_CODE_TYPE[rule["icmpcode"]])
rules.append({"traffictype": rule["traffictype"], "parsedrule": parsedrule})
self.createNetworkAclRule(rule, tier1_acl.id)
self.network.replaceACLList(self.apiclient, tier1_acl.id)
self.verifyAclRulesInRouter("eth2", rules, router)
tier2_acl = NetworkACLList.create(
self.apiclient,
services={},
name="tier2_acl",
description="tier2_acl",
vpcid=self.vpc.id
)
rules = []
# Ingress - ip6 saddr ::/0 udp dport { 0-65355 } ACTION
rule = {}
rule["traffictype"] = "Ingress"
rule["cidrlist"] = CIDR_IPV6_ANY
rule["protocol"] = "udp"
parsedrule = "ip6 saddr %s %s dport %s accept" % (rule["cidrlist"], rule["protocol"], TCP_UDP_PORT_ANY)
rules.append({"traffictype": rule["traffictype"], "parsedrule": parsedrule})
self.createNetworkAclRule(rule, tier2_acl.id)
# Egress - ip6 daddr DEST_CIDR icmpv6 type TYPE code CODE accept
rule = {}
rule["traffictype"] = "Egress"
rule["protocol"] = "all"
parsedrule = "ip6 daddr %s accept" % (CIDR_IPV6_ANY)
rules.append({"traffictype": rule["traffictype"], "parsedrule": parsedrule})
self.createNetworkAclRule(rule, tier2_acl.id)
self.network_offering_tier2 = self.createNetworkTierOfferingInternal(True, False)
self.tier2_network = self.deployNetworkTierInternal(
self.network_offering_tier2.id,
self.vpc.id,
VPC_DATA["tier2_gateway"],
VPC_DATA["tier_netmask"],
tier2_acl.id,
"tier2"
)
self.tier2_vm = self.deployNetworkTierVmInternal(self.tier2_network.id)
self.verifyAclRulesInRouter("eth3", rules, router)
def checkVpcVRRedundancy(self):
network_ip6gateway = self.getNetworkGateway(self.network)
primary_router = self.getVpcRouter(self.vpc)
Router.stop(
self.apiclient,
id=primary_router.id
)
time.sleep(self.services["sleep"]/2)
new_primary_router = self.getVpcRouter(self.vpc)
self.assertNotEqual(new_primary_router.id, primary_router.id,
"Original primary router ID: %s of VPC is still the primary router after stopping" % (primary_router.id))
self.checkIpv6VpcPrimaryRouter(new_primary_router, network_ip6gateway)
def checkIpv6VpcNetworking(self, check_vm=False):
self.checkIpv6Vpc()
self.checkIpv6NetworkTierBasic()
self.checkIpv6VpcRoutersBasic()
self.checkIpv6VpcRoutersInternal()
if check_vm:
self.checkIpv6NetworkTierVm()
@attr(
tags=[
"advanced",
"basic",
"eip",
"sg",
"advancedns",
"smoke"],
required_hardware="false")
@skipTestIf("ipv6NotSupported")
def test_01_verify_ipv6_vpc(self):
"""Test to verify IPv6 VPC
# Validate the following:
# 1. Create IPv6 VPC, add tiers, deploy VM
# 2. Verify VPC, tier has required IPv6 details
# 3. List router for the VPC and verify it has required IPv6 details for Guest and Public NIC of the VR
# 4. SSH into VR(s) and verify correct details are present for its NICs
# 5. Verify VM in network tier has required IPv6 details
# 6. Restart VPC with cleanup and re-verify VPC networking
# 7. Update network tier with a new offering and re-verify VPC networking
# 8. Deploy another IPv6 VPC with tier and check routing between two VPC and their VM
# 9. Create IPv6 ACL rules in two different VPC tiers and verify in VR if they get implemented correctly
"""
self.createIpv6VpcOffering()
self.deployVpc()
self.createIpv6NetworkTierOffering()
self.createIpv6NetworkTierOfferingForUpdate()
self.createTinyServiceOffering()
self.deployNetworkTier()
self.deployNetworkTierVm()
self.checkIpv6VpcNetworking(True)
self.prepareRoutingTestResourcesInBackground()
self.restartVpcWithCleanup()
self.checkIpv6VpcNetworking()
self.updateNetworkTierWithOffering()
self.checkIpv6VpcNetworking()
self.checkVpcRouting()
self.checkIpv6AclRule()