blob: 3839258f251bbf1f2a1994d7d6afc346f858983b [file]
from app.objects.secondclass.c_fact import Fact, OriginType
from app.objects.secondclass.c_relationship import Relationship
from app.objects.secondclass.c_rule import Rule
from app.objects.secondclass.c_link import Link
class TestKnowledgeService:
async def test_no_duplicate_fact(self, knowledge_svc, fire_event_mock):
await knowledge_svc.add_fact(Fact(trait='test', value='demo', score=1,
collected_by=['thin_air'], technique_id='T1234'))
await knowledge_svc.add_fact(Fact(trait='test', value='demo', score=1,
collected_by=['thin_air'], technique_id='T1234'))
facts = await knowledge_svc.get_facts(dict(trait='test'))
assert len(facts) == 1
async def test_no_duplicate_rules(self, knowledge_svc):
await knowledge_svc.add_rule(Rule(action='BLOCK', trait='a.c', match='.*'))
await knowledge_svc.add_rule(Rule(action='BLOCK', trait='a.c', match='.*'))
rules = await knowledge_svc.get_rules(dict(trait='a.c'))
assert len(rules) == 1
async def test_no_duplicate_relationship(self, knowledge_svc):
dummy = Fact(trait='test', value='demo', score=1, collected_by=['thin_air'], technique_id='T1234')
await knowledge_svc.add_relationship(Relationship(source=dummy, edge='potato', target=dummy))
await knowledge_svc.add_relationship(Relationship(source=dummy, edge='potato', target=dummy))
relationships = await knowledge_svc.get_relationships(dict(edge='potato'))
assert len(relationships) == 1
async def test_remove_fact(self, knowledge_svc, event_svc, fire_event_mock):
await knowledge_svc.add_fact(Fact(trait='rtest', value='rdemo', score=1,
collected_by=['thin_air'], technique_id='T1234'),
constraints=dict(test_field='test_value'))
await knowledge_svc.add_fact(Fact(trait='ktest', value='rdemo', score=1,
collected_by=['thin_air'], technique_id='T1234'))
await knowledge_svc.delete_fact(dict(trait='rtest'))
facts = await knowledge_svc.get_facts(dict(value='rdemo'))
assert len(facts) == 1
assert len(knowledge_svc._KnowledgeService__loaded_knowledge_module.fact_ram['constraints']) == 0
async def test_remove_rules(self, knowledge_svc):
await knowledge_svc.add_rule(Rule(action='rBLOCK', trait='ra.c', match='.*'),
constraints=dict(test_field='test_value'))
await knowledge_svc.delete_rule(dict(trait='ra.c'))
rules = await knowledge_svc.get_rules(dict(trait='ra.c'))
assert len(rules) == 0
assert len(knowledge_svc._KnowledgeService__loaded_knowledge_module.fact_ram['constraints']) == 0
async def test_remove_relationship(self, knowledge_svc):
dummy = Fact(trait='rtest', value='rdemo', score=1, collected_by=['thin_air'], technique_id='T1234')
await knowledge_svc.add_relationship(Relationship(source=dummy, edge='rpotato', target=dummy),
constraints=dict(test_field='test_value'))
await knowledge_svc.delete_relationship(dict(edge='rpotato'))
relationships = await knowledge_svc.get_relationships(dict(edge='rpotato'))
assert len(relationships) == 0
assert len(knowledge_svc._KnowledgeService__loaded_knowledge_module.fact_ram['constraints']) == 0
async def test_update_fact(self, knowledge_svc, fire_event_mock):
await knowledge_svc.add_fact(Fact(trait='utest', value='udemo', score=1,
collected_by=['thin_air'], technique_id='T1234'))
await knowledge_svc.update_fact(criteria=dict(trait='utest'),
updates=dict(trait='utest2', value='udemo2'))
facts = await knowledge_svc.get_facts(dict(trait='utest2'))
assert len(facts) == 1
assert facts[0].value == 'udemo2'
async def test_update_relationship(self, knowledge_svc):
dummy = Fact(trait='utest', value='udemo', score=1, collected_by=['thin_air'], technique_id='T1234')
dummy2 = Fact(trait='utest2', value='udemo2', score=1, collected_by=['thin_air'], technique_id='T4321')
await knowledge_svc.add_relationship(Relationship(source=dummy, edge='upotato', target=dummy))
await knowledge_svc.update_relationship(criteria=dict(edge='upotato'),
updates=dict(source=dummy2, edge='ubacon'))
relationships = await knowledge_svc.get_relationships(dict(edge='ubacon'))
assert len(relationships) == 1
assert relationships[0].source == dummy2
async def test_retrieve_fact(self, knowledge_svc, fire_event_mock):
await knowledge_svc.add_fact(Fact(trait='ttestA', value='tdemoB', score=24,
collected_by=['thin_airA'], technique_id='T1234'))
await knowledge_svc.add_fact(Fact(trait='ttestB', value='tdemoA', score=42,
collected_by=['thin_airB'], technique_id='T4321'))
facts = await knowledge_svc.get_facts(dict(trait='ttestB'))
assert len(facts) == 1
readable = facts[0].display
assert readable['value'] == 'tdemoA'
assert readable['score'] == 42
async def test_retrieve_relationship(self, knowledge_svc):
dummy = Fact(trait='ttest', value='tdemo', score=1, collected_by=['thin_air'], technique_id='T1234')
dummy2 = Fact(trait='ttest2', value='tdemo2', score=1, collected_by=['thin_air'], technique_id='T1234')
await knowledge_svc.add_relationship(Relationship(source=dummy, edge='tpotato', target=dummy2))
await knowledge_svc.add_relationship(Relationship(source=dummy2, edge='tpotato', target=dummy))
relationships = await knowledge_svc.get_relationships(dict(edge='tpotato'))
assert len(relationships) == 2
specific = await knowledge_svc.get_relationships(dict(source=dummy))
assert len(specific) == 1
readable = specific[0].display
assert readable['edge'] == 'tpotato'
assert readable['target'].trait == 'ttest2'
async def test_retrieve_rule(self, knowledge_svc):
await knowledge_svc.add_rule(Rule(action='tBLOCK', trait='ta.d', match='4.5.*'))
await knowledge_svc.add_rule(Rule(action='tALLOW', trait='ta.d', match='*.5.*'))
rules = await knowledge_svc.get_rules(dict(trait='ta.d'))
assert len(rules) == 2
fuzzy1 = await knowledge_svc.get_rules(dict(trait='ta.d', match='4.5.6'))
assert len(fuzzy1) == 2
fuzzy2 = await knowledge_svc.get_rules(dict(trait='ta.d', match='6.5.4'))
assert len(fuzzy2) == 1
assert fuzzy2[0].action == 'tALLOW'
fuzzy3 = await knowledge_svc.get_rules(dict(trait='ta.d', match='5.*'))
assert len(fuzzy3) == 2
async def test_fact_origin(self, knowledge_svc, ability, executor):
texecutor = executor(name='sh', platform='darwin', command='mkdir test', cleanup='rm -rf test')
tability = ability(ability_id='123', executors=[texecutor], repeatable=True, buckets=['test'])
link = Link.load(dict(command='', paw='n1234', ability=tability, executor=next(tability.executors), status=0,
id='ganymede'))
type1_fact = Fact(trait='t1', value='d1', score=1, collected_by=['thin_air'], technique_id='T1234',
links=[link.id], origin_type=OriginType.LEARNED)
type2_fact = Fact(trait='t2', value='d2', score=1, collected_by=['thin_air'], technique_id='T1234',
links=[link.id], origin_type=OriginType.LEARNED)
type3_fact = Fact(trait='t3', value='d3', score=1, collected_by=['tiny_lightning_bolts_running_through_sand'],
technique_id='T1234', origin_type=OriginType.SEEDED, source="Europa")
await knowledge_svc.add_fact(type1_fact)
await knowledge_svc.add_fact(type2_fact)
await knowledge_svc.add_fact(type3_fact)
origin_1, type_1 = await knowledge_svc.get_fact_origin(type1_fact)
origin_2, type_2 = await knowledge_svc.get_fact_origin(type2_fact.trait)
origin_3, type_3 = await knowledge_svc.get_fact_origin(type3_fact.trait)
assert origin_1 == link.id
assert origin_2 == link.id
assert origin_3 == 'Europa'
assert type_1 == OriginType.LEARNED
assert type_2 == OriginType.LEARNED
assert type_3 == OriginType.SEEDED