blob: 49678c6385c0173d0945192d900666015daf94ca [file] [log] [blame]
#!/usr/bin/env python
# encoding: utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""This module contains functions for handling requests in relation to security
groups
"""
from ec2stack import helpers, errors
from ec2stack.providers import cloudstack
from ec2stack.core import Ec2stackError
from ec2stack.providers.cloudstack import requester
@helpers.authentication_required
def authenticate_security_group_egress():
"""
Add egress rules to a security group.
@return: Response.
"""
rule_type = 'egress'
response = _authenticate_security_group_request(rule_type)
return _authenticate_security_group_response(response, rule_type)
def _authenticate_security_group_request(rule_type):
"""
Request to add an egress rule to a security group.
@param rule_type: The type of rule to add.
@return: Response.
"""
args = _parse_security_group_request()
if rule_type == 'egress':
args['command'] = 'authorizeSecurityGroupEgress'
elif rule_type == 'ingress':
args['command'] = 'authorizeSecurityGroupIngress'
response = requester.make_request_async(args)
return response
def _authenticate_security_group_response(response, rule_type):
"""
Generate a response for authenticate security group request.
@param response: Cloudstack response.
@param rule_type: The type of rule to add.
@raise Ec2stackError: If authorize security group fails.
@return: Response
"""
if 'errortext' in response:
if 'Failed to authorize security group' in response['errortext']:
cidrlist = str(helpers.get('CidrIp'))
protocol = str(helpers.get('IpProtocol'))
from_port = str(helpers.get('FromPort'))
to_port = str(helpers.get('toPort'))
raise Ec2stackError(
'400',
'InvalidPermission.Duplicate',
'the specified rule "peer: ' + cidrlist + ', ' + protocol +
', from port: ' + from_port + ', to port: ' + to_port +
', ALLOW" already exists'
)
elif 'Unable to find security group' in response['errortext']:
errors.invalid_security_group()
else:
errors.invalid_request(response['errortext'])
else:
if rule_type == 'ingress':
rule_type = 'AuthorizeSecurityGroupIngressResponse'
elif rule_type == 'egress':
rule_type = 'AuthorizeSecurityGroupEgressResponse'
return {
'template_name_or_list': 'status.xml',
'response_type': rule_type,
'return': 'true'
}
@helpers.authentication_required
def create_security_group():
"""
Create a security group.
@return: Response.
"""
helpers.require_parameters(['GroupName', 'GroupDescription'])
response = _create_security_group_request()
return _create_security_group_response(response)
def _create_security_group_request():
"""
Request to create a security group.
@return: response.
"""
args = {'command': 'createSecurityGroup', 'name': helpers.get('GroupName'),
'description': helpers.get('GroupDescription')}
response = requester.make_request(args)
response = response['createsecuritygroupresponse']
return response
def _create_security_group_response(response):
"""
Generate a response for create security group request.
@param response: Cloudstack response.
@return: Response.
"""
if 'errortext' in response:
errors.duplicate_security_group()
else:
response = response['securitygroup']
return {
'template_name_or_list': 'create_security_group.xml',
'response_type': 'CreateSecurityGroupResponse',
'id': response['id'],
'return': 'true'
}
@helpers.authentication_required
def delete_security_group():
"""
Deletes a specified security group.
@return: Response.
"""
_delete_security_group_request()
return _delete_security_group_response()
def _delete_security_group_request():
"""
Request to delete a security group.
@return: Response.
"""
args = {}
helpers.require_atleast_one_parameter(['GroupName', 'GroupId'])
if helpers.contains_parameter('GroupName'):
args['name'] = helpers.get('GroupName')
elif helpers.contains_parameter('GroupId'):
args['id'] = helpers.get('GroupId')
args['command'] = 'deleteSecurityGroup'
response = requester.make_request(args)
return response
def _delete_security_group_response():
"""
Generate a response for delete security group request.
@return: response
"""
return {
'template_name_or_list': 'status.xml',
'response_type': 'DeleteSecurityGroupResponse',
'return': 'true'
}
@helpers.authentication_required
def describe_security_groups():
"""
Describe one or more security groups.
@return: Response
"""
args = {'command': 'listSecurityGroups'}
response = cloudstack.describe_item(
args, 'securitygroup', errors.invalid_security_group, 'Group'
)
return _describe_security_groups_response(
response
)
def _describe_security_groups_response(response):
"""
Generates a response for describe security group request.
@param response: Cloudstack response.
@return: Response.
"""
return {
'template_name_or_list': 'securitygroups.xml',
'response_type': 'DescribeSecurityGroupsResponse',
'response': response
}
@helpers.authentication_required
def authenticate_security_group_ingress():
"""
Add one or more ingress rules to a security group.
@return: Response.
"""
rule_type = 'ingress'
response = _authenticate_security_group_request(rule_type)
return _authenticate_security_group_response(response, rule_type)
@helpers.authentication_required
def revoke_security_group_ingress():
"""
Removes one or more ingress rules from a security group.
@return: Response.
"""
rule_type = 'ingress'
_revoke_security_group_request(rule_type)
return _revoke_security_group_response(rule_type)
@helpers.authentication_required
def revoke_security_group_egress():
"""
Removes one or more egress rules from a security group.
@return: Response.
"""
rule_type = 'egress'
_revoke_security_group_request(rule_type)
return _revoke_security_group_response(rule_type)
def _revoke_security_group_request(rule_type):
"""
Request to remove rule from security group
@param rule_type: The type of rule to remove.
@return: Response.
"""
args = {}
rules = _parse_security_group_request()
if rule_type == 'ingress':
args['command'] = 'revokeSecurityGroupIngress'
args['id'] = _find_rule(rules, 'ingressrule')
elif rule_type == 'egress':
args['command'] = 'revokeSecurityGroupEgress'
args['id'] = _find_rule(rules, 'egressrule')
response = requester.make_request_async(args)
return response
def _revoke_security_group_response(rule_type):
"""
Generate a response for revoke security group requests.
@param rule_type: The type of rule
@return: Response.
"""
if rule_type == 'ingress':
rule_type = 'RevokeSecurityGroupIngressResponse'
elif rule_type == 'egress':
rule_type = 'RevokeSecurityGroupEgressResponse'
return {
'template_name_or_list': 'status.xml',
'response_type': rule_type,
'return': 'true'
}
def _find_rule(rule, rule_type):
"""
Searches a Cloudstack response for a rule and returns its Id.
@param rule: Rule to be found.
@param rule_type: Type of rule.
@return: Id of the rule.
"""
security_group = _get_security_group(rule)
if rule_type in security_group:
found_rules = security_group[rule_type]
for found_rule in found_rules:
if _compare_rules(rule, found_rule):
return found_rule['ruleid']
errors.invalid_permission()
def _compare_rules(left, right):
"""
Compares two rules to see if they are the same.
@param left: rule to be compared.
@param right: rule to compare with.
@return: Boolean
"""
protocol_match = str(left['protocol']) == str(right['protocol'])
cidr_match = str(left['cidrlist']) == str(right['cidr'])
if 'startport' in left and 'startport' in right:
startport_match = str(left['startport']) == str(right['startport'])
elif 'icmptype' in left and 'icmptype' in right:
startport_match = str(left['icmptype']) == str(right['icmptype'])
else:
startport_match = False
if 'endport' in left and 'endport' in right:
endport_match = str(left['endport']) == str(right['endport'])
elif 'icmpcode' in left and 'icmpcode' in right:
endport_match = str(left['icmpcode']) == str(right['icmpcode'])
else:
endport_match = False
return protocol_match and cidr_match and startport_match and endport_match
def _get_security_group(args):
"""
Get the security group with the specified name.
@param args: Arguments to pass to request.
@return: Response.
"""
args['command'] = 'listSecurityGroups'
response = cloudstack.describe_item_request(
args, 'securitygroup', errors.invalid_security_group
)
return response
def _parse_security_group_request(args=None):
"""
Parse the request parameters into a Cloudstack request payload.
@param args: Arguments to include in the request.
@return: Request payload.
"""
if args is None:
args = {}
helpers.require_atleast_one_parameter(['GroupName', 'GroupId'])
if helpers.contains_parameter('GroupName'):
args['securityGroupName'] = helpers.get('GroupName')
args['name'] = helpers.get('GroupName')
elif helpers.contains_parameter('GroupId'):
args['securityGroupId'] = helpers.get('GroupId')
args['id'] = helpers.get('GroupId')
helpers.require_parameters(['IpProtocol'])
args['protocol'] = helpers.get('IpProtocol')
helpers.require_parameters(['FromPort', 'ToPort'])
if args['protocol'] in ['icmp']:
args['icmptype'] = helpers.get('FromPort')
args['icmpcode'] = helpers.get('ToPort')
else:
args['startport'] = helpers.get('FromPort')
args['endport'] = helpers.get('ToPort')
if helpers.get('CidrIp') is None:
args['cidrlist'] = '0.0.0.0/0'
else:
args['cidrlist'] = helpers.get('CidrIp')
return args