blob: 6a1a98641a233de8003bfa4c4164919077eabb65 [file] [log] [blame]
import json
import inspect
from typing import Any, List
from .response import ResponseBody
class CommandExecutor:
def __init__(self, enforcer, command_name, args):
"""Initialize the command executor"""
self.enforcer = enforcer
self.command_name = command_name
self.args = args
def execute(self):
"""Execute the command and return the result in JSON format"""
try:
# Method name mapping: Java style -> Python style
method_mapping = {
'enforceEx': 'enforce_ex',
'enforceExWithMatcher': 'enforce_ex_with_matcher',
'batchEnforce': 'batch_enforce',
'getAllSubjects': 'get_all_subjects',
'getAllObjects': 'get_all_objects',
'getAllActions': 'get_all_actions',
'getAllRoles': 'get_all_roles',
'getAllNamedSubjects': 'get_all_named_subjects',
'getAllNamedObjects': 'get_all_named_objects',
'getAllNamedActions': 'get_all_named_actions',
'getAllNamedRoles': 'get_all_named_roles',
'addPolicy': 'add_policy',
'removePolicy': 'remove_policy',
'updatePolicy': 'update_policy',
'addGroupingPolicy': 'add_grouping_policy',
'removeGroupingPolicy': 'remove_grouping_policy',
'updateGroupingPolicy': 'update_grouping_policy',
'addNamedPolicy': 'add_named_policy',
'removeNamedPolicy': 'remove_named_policy',
'addNamedPolicies': 'add_named_policies',
'removeNamedPolicies': 'remove_named_policies',
'addNamedGroupingPolicy': 'add_named_grouping_policy',
'removeNamedGroupingPolicy': 'remove_named_grouping_policy',
'addNamedGroupingPolicies': 'add_named_grouping_policies',
'removeNamedGroupingPolicies': 'remove_named_grouping_policies',
'removeFilteredPolicy': 'remove_filtered_policy',
'removeFilteredNamedPolicy': 'remove_filtered_named_policy',
'removeFilteredGroupingPolicy': 'remove_filtered_grouping_policy',
'removeFilteredNamedGroupingPolicy': 'remove_filtered_named_grouping_policy',
'hasPolicy': 'has_policy',
'hasNamedPolicy': 'has_named_policy',
'hasGroupingPolicy': 'has_grouping_policy',
'hasNamedGroupingPolicy': 'has_named_grouping_policy',
'getPolicy': 'get_policy',
'getNamedPolicy': 'get_named_policy',
'getGroupingPolicy': 'get_grouping_policy',
'getNamedGroupingPolicy': 'get_named_grouping_policy',
'getFilteredPolicy': 'get_filtered_policy',
'getFilteredNamedPolicy': 'get_filtered_named_policy',
'getFilteredGroupingPolicy': 'get_filtered_grouping_policy',
'getFilteredNamedGroupingPolicy': 'get_filtered_named_grouping_policy',
'getRolesForUser': 'get_roles_for_user',
'getUsersForRole': 'get_users_for_role',
'hasRoleForUser': 'has_role_for_user',
'addRoleForUser': 'add_role_for_user',
'deleteRoleForUser': 'delete_role_for_user',
'deleteRolesForUser': 'delete_roles_for_user',
'deleteUser': 'delete_user',
'deleteRole': 'delete_role',
'deletePermission': 'delete_permission',
'addPermissionForUser': 'add_permission_for_user',
'deletePermissionForUser': 'delete_permission_for_user',
'deletePermissionsForUser': 'delete_permissions_for_user',
'getPermissionsForUser': 'get_permissions_for_user',
'hasPermissionForUser': 'has_permission_for_user',
'getImplicitRolesForUser': 'get_implicit_roles_for_user',
'getImplicitPermissionsForUser': 'get_implicit_permissions_for_user',
'getImplicitUsersForRole': 'get_implicit_users_for_role',
'addPolicies': 'add_policies',
}
actual_method_name = method_mapping.get(self.command_name, self.command_name)
if not hasattr(self.enforcer, actual_method_name):
available_methods = [method for method in dir(self.enforcer) if not method.startswith('_')]
raise AttributeError(f"Method '{actual_method_name}' not found. Available methods: {available_methods[:10]}...")
method = getattr(self.enforcer, actual_method_name)
# Convert arguments based on method signature
converted_args = self._convert_arguments(method, self.args)
# Execute method
result = method(*converted_args)
# Build response with standardized format
response = ResponseBody()
# Process result based on return type
if isinstance(result, bool):
response.allow = result
response.explain = None
elif isinstance(result, tuple) and len(result) == 2:
# Handle enforce_ex return format: (boolean, list)
response.allow = result[0]
response.explain = result[1]
elif isinstance(result, list):
response.allow = None
response.explain = result
elif hasattr(result, 'allow') and hasattr(result, 'explain'):
# Handle EnforceResult type
response.allow = result.allow
response.explain = result.explain
else:
response.allow = None
response.explain = result
# Save policy for modification operations
modification_operations = [
'addPolicy', 'removePolicy', 'updatePolicy',
'addGroupingPolicy', 'removeGroupingPolicy', 'updateGroupingPolicy',
'addNamedPolicy', 'removeNamedPolicy', 'addNamedPolicies', 'removeNamedPolicies',
'addNamedGroupingPolicy', 'removeNamedGroupingPolicy', 'addNamedGroupingPolicies',
'removeNamedGroupingPolicies', 'removeFilteredPolicy', 'removeFilteredNamedPolicy',
'removeFilteredGroupingPolicy', 'removeFilteredNamedGroupingPolicy',
'updateNamedGroupingPolicy', 'addRoleForUser', 'deleteRoleForUser', 'deleteRolesForUser',
'deleteUser', 'deleteRole', 'deletePermission', 'addPermissionForUser',
'deletePermissionForUser', 'deletePermissionsForUser'
]
if self.command_name in modification_operations:
self.enforcer.save_policy()
# Return JSON response with consistent formatting
return json.dumps(response.to_dict(), separators=(',', ':'), ensure_ascii=False)
except Exception as e:
import sys
if hasattr(sys, '_called_from_test') or 'pytest' in sys.modules:
raise Exception(f"Error executing command '{self.command_name}': {str(e)}")
else:
raise Exception(f"Error executing command '{self.command_name}': {str(e)}")
def _convert_arguments(self, method, args: List[str]) -> List[Any]:
"""Convert string arguments to appropriate types based on method signature"""
if not args:
return []
converted = []
# Handle special cases for specific method signatures
if self.command_name == 'batchEnforce':
#print(f"DEBUG: Input args: {args}")
#print(f"DEBUG: Args length: {len(args)}")
batch_requests = []
for arg in args:
#print(f"DEBUG: Processing arg {i}: '{arg}'")
split_result = arg.split(',')
#print(f"DEBUG: Split result: {split_result}")
batch_requests.append(arg.split(','))
#print(f"DEBUG: Final batch_requests: {batch_requests}")
return batch_requests
'''
if self.command_name == 'batchEnforce':
# Convert comma-separated strings to lists for batch operations
print(f"DEBUG: Original args: {self.args}")
batch_requests = []
for arg in args:
if ',' in arg:
batch_requests.append(arg.split(','))
print(f"DEBUG: Converted requests: {batch_requests}")
try:
result = self.enforcer.batch_enforce(batch_requests)
print(f"DEBUG: Batch enforce result: {result}")
return batch_requests
except Exception as e:
print(f"DEBUG: Error in batch_enforce: {e}")
raise
#else:
# batch_requests.append([arg])
#return batch_requests
'''
# Handle methods with matcher parameter
if self.command_name in ['enforceWithMatcher', 'enforceExWithMatcher']:
# First argument is matcher string, rest are regular parameters
converted.append(args[0]) # matcher
converted.extend(args[1:]) # other parameters
return converted
# Handle JSON object parameters
for i, arg in enumerate(args):
if arg and arg.strip().startswith('{'):
try:
converted.append(json.loads(arg))
except json.JSONDecodeError:
converted.append(arg)
elif arg and ',' in arg and self._should_split_as_list(self.command_name, i):
# Split comma-separated values for list parameters
converted.append(arg.split(','))
else:
# Handle type conversion for specific parameter types
converted_arg = self._convert_single_argument(arg)
converted.append(converted_arg)
return converted
def _should_split_as_list(self, method_name: str, arg_index: int) -> bool:
"""Determine if an argument should be split into a list based on method and position"""
# Define methods that expect list parameters at specific positions
list_methods = {
'addPolicies': [0],
'removePolicies': [0],
'addNamedPolicies': [1],
'removeNamedPolicies': [1],
'addGroupingPolicies': [0],
'removeGroupingPolicies': [0],
'addNamedGroupingPolicies': [1],
'removeNamedGroupingPolicies': [1]
}
return method_name in list_methods and arg_index in list_methods[method_name]
def _convert_single_argument(self, arg: str) -> Any:
"""Convert a single string argument to appropriate type"""
if arg is None:
return None
# Try to convert to integer
try:
return int(arg)
except ValueError:
pass
# Try to convert to boolean
if arg.lower() in ['true', 'false']:
return arg.lower() == 'true'
# Return as string
return arg