blob: 7b742b02150231b0023d321fad996d8856ab8f2d [file]
import pytest
from aiohttp import web
from app.service.app_svc import AppService
from app.service.auth_svc import AuthService, CONFIG_API_KEY_RED
from app.service.file_svc import FileSvc
from app.service.data_svc import DataService
from app.service.event_svc import EventService
from app.service.contact_svc import ContactService
from app.utility.base_service import BaseService
from app.utility.base_world import BaseWorld
from app.api.v2.handlers.fact_api import FactApi
from app.api.v2.responses import json_request_validation_middleware
from app.api.v2.security import authentication_required_middleware_factory
from app.objects.secondclass.c_fact import WILDCARD_STRING
from app.service.knowledge_svc import KnowledgeService
from app.objects.c_operation import Operation
pytestmark = pytest.mark.asyncio
from app.objects.secondclass.c_link import Link
from app.objects.secondclass.c_fact import Fact
from app.objects.c_ability import Ability
from app.objects.c_agent import Agent
from app.objects.secondclass.c_executor import Executor
from datetime import datetime, timezone
from app.objects.c_planner import Planner
from app.objects.c_adversary import Adversary
from app.objects.c_objective import Objective
cakr = 'abc123'
headers = {'key': 'abc123', 'Content-Type': 'application/json'}
# @pytest.fixture
# def base_world():
# BaseWorld.apply_config(
# name='main',
# config={
# CONFIG_API_KEY_RED: cakr,
# 'users': {
# 'red': {'reduser': 'redpass', 'api_key': cakr},
# 'blue': {'blueuser': 'bluepass', 'api_key': 'def456'}
# },
# 'crypt_salt': 'thisisdefinitelynotkosher', # Salt for file service instantiation
# 'encryption_key': 'andneitheristhis', # fake encryption key for file service instantiation
# }
# )
# yield BaseWorld
# BaseWorld.clear_config()
@pytest.fixture
async def knowledge_webapp(data_svc):
BaseWorld.apply_config(
name='main',
config={
CONFIG_API_KEY_RED: cakr,
'users': {
'red': {'reduser': 'redpass', 'api_key': cakr},
'blue': {'blueuser': 'bluepass', 'api_key': 'def456'}
},
'crypt_salt': 'thisisdefinitelynotkosher',
'encryption_key': 'andneitheristhis',
}
)
app_svc = AppService(web.Application())
app_svc.add_service('auth_svc', AuthService())
app_svc.add_service('knowledge_svc', KnowledgeService())
app_svc.add_service('data_svc', DataService())
app_svc.add_service('event_svc', EventService())
app_svc.add_service('contact_svc', ContactService())
app_svc.add_service('file_svc', FileSvc()) # This needs to be done this way, or it we won't have a valid BaseWorld
services = app_svc.get_services()
app = web.Application(
middlewares=[
authentication_required_middleware_factory(services['auth_svc']),
json_request_validation_middleware
]
)
FactApi(services).add_routes(app)
await app_svc.register_contacts()
return app
async def test_display_facts(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
fact_data = {
'trait': 'demo',
'value': 'test'
}
await client.post('/facts', json=fact_data, headers=headers)
resp = await client.get('/facts', json=fact_data, headers=headers)
data = await resp.json()
response = data['found']
assert len(response) == 1
assert response[0]['trait'] == 'demo'
assert response[0]['value'] == 'test'
assert response[0]['source'] == WILDCARD_STRING
async def test_display_operation_facts(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
op_id_test = 'this_is_a_valid_operation_id'
fact_data = {
'trait': 'demo',
'value': 'test',
'source': op_id_test
}
await client.post('/facts', json=fact_data, headers=headers)
resp = await client.get(f'/facts/{op_id_test}', headers=headers)
data = await resp.json()
response = data['found']
assert len(response) == 1
assert response[0]['trait'] == 'demo'
assert response[0]['value'] == 'test'
assert response[0]['source'] == op_id_test
async def test_display_relationships(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
op_id_test = 'this_is_a_valid_operation_id'
fact_data_a = {
'trait': 'a',
'value': '1',
}
fact_data_b = {
'trait': 'b',
'value': '2'
}
relationship_data = {
'source': fact_data_a,
'edge': 'gamma',
'target': fact_data_b,
'origin': op_id_test
}
await client.post('/relationships', json=relationship_data, headers=headers)
resp = await client.get('/relationships', json=relationship_data, headers=headers)
data = await resp.json()
response = data['found']
assert len(response) == 1
assert response[0]['source']['trait'] == 'a'
assert response[0]['source']['value'] == '1'
assert response[0]['edge'] == 'gamma'
assert response[0]['origin'] == 'this_is_a_valid_operation_id'
assert response[0]['source']['source'] == 'this_is_a_valid_operation_id'
async def test_display_operation_relationships(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
op_id_test = 'this_is_a_valid_operation_id'
fact_data_a = {
'trait': 'a',
'value': '1',
'source': op_id_test
}
fact_data_b = {
'trait': 'b',
'value': '2',
'source': op_id_test
}
relationship_data = {
'source': fact_data_a,
'edge': 'gamma',
'target': fact_data_b,
'origin': op_id_test
}
await client.post('/relationships', json=relationship_data, headers=headers)
resp = await client.get(f'/relationships/{op_id_test}', headers=headers)
data = await resp.json()
response = data['found']
assert len(response) == 1
assert response[0]['source']['trait'] == fact_data_a['trait']
assert response[0]['source']['value'] == fact_data_a['value']
assert response[0]['target']['trait'] == fact_data_b['trait']
assert response[0]['target']['value'] == fact_data_b['value']
assert response[0]['edge'] == relationship_data['edge']
assert response[0]['origin'] == op_id_test
assert response[0]['source']['source'] == op_id_test
assert response[0]['target']['source'] == op_id_test
async def test_remove_fact(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
fact_data = {
'trait': 'demo',
'value': 'test'
}
init = await client.post('/facts', json=fact_data, headers=headers)
pre = await init.json()
subs = await client.delete('/facts', json=fact_data, headers=headers)
post = await subs.json()
tmp = await client.get('/facts', params=fact_data, headers=headers)
cur = await tmp.json()
current = cur['found']
start = pre['added']
end = post['removed']
assert len(start) == 1
assert len(end) == 1
assert len(current) == 0
assert start == end
async def test_remove_relationship(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
op_id_test = 'this_is_a_valid_operation_id'
fact_data_a = {
'trait': 'a',
'value': '1',
}
fact_data_b = {
'trait': 'b',
'value': '2'
}
relationship_data = {
'source': fact_data_a,
'edge': 'alpha',
'target': fact_data_b,
'origin': op_id_test
}
init = await client.post('/relationships', json=relationship_data, headers=headers)
pre = await init.json()
subs = await client.delete('/relationships', json=dict(edge='alpha'), headers=headers)
post = await subs.json()
resp = await client.get('/relationships', json=relationship_data, headers=headers)
cur = await resp.json()
start = pre['added']
end = post['removed']
current = cur['found']
assert len(start) == 1
assert len(end) == 1
assert len(current) == 0
assert start == end
async def test_add_fact(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
fact_data = {
'trait': 'demo',
'value': 'test'
}
resp = await client.post('/facts', json=fact_data, headers=headers)
data = await resp.json()
response = data['added']
assert len(response) == 1
assert response[0]['trait'] == 'demo'
assert response[0]['value'] == 'test'
tmp = await client.get('/facts', json=fact_data, headers=headers)
cur = await tmp.json()
current = cur['found']
assert current == response
async def test_add_fact_to_operation(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
data_svc = BaseService.get_service('data_svc')
# STEP 1: Store ability
ability = Ability(
ability_id='123',
name='Test Ability',
tactic='discovery',
technique_id='T1003',
technique_name='Credential Dumping',
platform='linux',
executor='sh'
)
await data_svc.store(ability)
# STEP 2: Store agent
agent = Agent(
paw='123456',
server='http://localhost:8888',
trusted=True,
platform='linux',
architecture='x86_64',
executors=['sh']
)
await data_svc.store(agent)
# STEP 3: Create operation
op = Operation(name='testop', autonomous=False, state='running')
op.start = datetime.now(timezone.utc)
op_id = op.id
# STEP 4: Add link
link = Link(command='whoami', paw='123456', ability={'id': '123'}, status=-1)
link.facts = [Fact(trait='demo', value='test', source=op_id)]
op.add_link(link)
# STEP 5: Add planner, adversary, objective
planner = Planner(name='testplanner', module='atomic', params={})
await data_svc.store(planner)
adversary = Adversary(adversary_id='adv-123', name='testadv', description='test')
adversary.atomic_ordering = ['123']
await data_svc.store(adversary)
objective = Objective(name='testobj', description='desc', goals=[])
await data_svc.store(objective)
op.planner = planner
op.adversary = adversary
op.objective = objective
op.state = 'running'
await data_svc.store(op)
# ✅ STEP 6: Add fact via API BEFORE marking operation finished
fact_data = {
'trait': 'demo',
'value': 'test',
'source': op_id
}
resp = await client.post('/facts', json=fact_data, headers=headers)
assert resp.status == 200, await resp.text()
response = (await resp.json())['added']
assert len(response) == 1
assert response[0]['trait'] == 'demo'
assert response[0]['value'] == 'test'
assert response[0]['source'] == op_id
# STEP 7: Verify fact exists via GET
tmp = await client.get('/facts', json=fact_data, headers=headers)
assert tmp.status == 200, await tmp.text()
current = (await tmp.json())['found']
assert current == response
# ✅ STEP 8: Mark operation finished AFTER all fact submissions
op.state = 'finished'
await data_svc.store(op)
# STEP 9: Generate and validate report
file_svc = BaseService.get_service('file_svc')
matched_operations = await data_svc.locate('operations', {'id': op_id})
assert matched_operations, "Expected operation to be found, got none"
try:
report = await matched_operations[0].report(file_svc, data_svc)
except Exception as e:
print(f"Error generating report: {e}")
report = None
assert report is not None, "Expected operation report to be generated, got None"
assert 'facts' in report, "Expected 'facts' key in report"
found = any(
f.trait == response[0]['trait'] and
f.value == response[0]['value'] and
f.source == response[0]['source']
for f in report['facts']
)
assert found, 'Fact from response not found in report facts'
async def test_add_relationship(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
fact_data_a = {
'trait': 'a',
'value': '1',
}
fact_data_b = {
'trait': 'b',
'value': '2'
}
relationship_data = {
'source': fact_data_a,
'edge': 'tango',
'target': fact_data_b
}
expected_response = f"{fact_data_a['trait']}({fact_data_a['value']}) : " \
f"tango : {fact_data_b['trait']}({fact_data_b['value']})"
resp = await client.post('/relationships', json=relationship_data, headers=headers)
data = await resp.json()
response = data['added']
assert len(response) == 1
assert response[0]['source']['trait'] == fact_data_a['trait']
assert response[0]['target']['value'] == fact_data_b['value']
assert response[0]['edge'] == 'tango'
assert response[0]['source']['relationships'] == response[0]['target']['relationships']
assert response[0]['source']['relationships'][0] == expected_response
resp = await client.get('/relationships', json=relationship_data, headers=headers)
cur = await resp.json()
current = cur['found']
assert current == response
async def test_patch_fact(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
fact_data = {
'trait': 'domain.user.name',
'value': 'thomas'
}
patch_data = {
"criteria": {
"trait": "domain.user.name",
"value": "thomas"},
"updates": {
"value": "jacobson"
}
}
await client.post('/facts', json=fact_data, headers=headers)
resp = await client.patch('/facts', json=patch_data, headers=headers)
message = await resp.json()
patched = message['updated']
assert len(patched) == 1
assert patched[0]['value'] == 'jacobson'
tmp = await client.get('/facts', json=dict(trait='domain.user.name'), headers=headers)
cur = await tmp.json()
current = cur['found']
assert len(current) == 1
assert patched == current
async def test_patch_relationship(knowledge_webapp, aiohttp_client, fire_event_mock):
client = await aiohttp_client(knowledge_webapp)
relationship_data = {
"source": {
"trait": "domain.user.name",
"value": "bobross"
},
"edge": "has_password",
"target": {
"trait": "domain.user.password",
"value": "12345"
}
}
patch_data = {
"criteria": {
"edge": "has_password",
"source": {
"value": "bobross"
}
},
"updates": {
"target": {
"value": "54321"
},
"edge": "has_admin_password"
}
}
await client.post('/relationships', json=relationship_data, headers=headers)
resp = await client.patch('/relationships', json=patch_data, headers=headers)
message = await resp.json()
patched = message['updated']
assert len(patched) == 1
assert patched[0]['target']['value'] == '54321'
assert patched[0]['source']['value'] == 'bobross'
assert patched[0]['edge'] == 'has_admin_password'
tmp = await client.get('/relationships', json=dict(edge='has_admin_password'), headers=headers)
cur = await tmp.json()
current = cur['found']
assert len(current) == 1
assert patched == current