security(file_svc): containment check on save_file to block agent-contact path traversal (#3380)
* security(file_svc): containment check on save_file to block path traversal
file_svc.save_file builds its target path as os.path.join(target_dir, filename)
and writes payload bytes there with no validation of the resulting location.
It is reached from the agent contact handlers with an attacker-controlled
filename:
* app/contacts/contact_ftp.py:217 — FTP submit_uploaded_file
* app/contacts/contact_dns.py:491 — DNS _submit_uploaded_file
* app/contacts/contact_gist.py:212 — Gist contact upload
* app/contacts/contact_slack.py:206 — Slack contact upload
A filename of e.g. '../../data/object_store' escapes the exfil tree and lets
an attacker write arbitrary bytes to any path the server has permission to
write. data_svc.restore_state pickle.loads('data/object_store') on the next
restart, which turns that write primitive into RCE on the team server. The
file is Fernet-encrypted with conf encryption_key, but with the documented
default encryption_key=ADMIN123 a remote attacker can forge a valid blob.
save_file has two legitimate call modes:
(1) target_dir + basename — sink mode used by data_svc, app_svc, c_operation,
base_knowledge_svc, and the four agent contact handlers above.
(2) '' + relative-path — path mode used by ability_api_manager,
base_api_manager, and rest_svc, which assemble paths from sanitised IDs.
A simple "filename must be a basename" check breaks mode (2). Instead, resolve
the final path with realpath and require it to stay under its parent. This:
* rejects '..' / absolute-path escape in BOTH call modes, with one check;
* leaves all in-tree callers' happy paths intact (object_store, contact
reports, operation logs, ability/source YAML writes);
* provides defense-in-depth for any future caller, not just the four contact
handlers known to be vulnerable today.
Includes a regression test that exercises three traversal vectors and
confirms each is rejected before any bytes are written.
Reported externally on 2026-05-18 alongside the broader audit pass.
* test(file_svc): fix path-traversal regression test guard
The post-condition assertion 'not os.path.exists(realpath(...))' used
'/etc/passwd' as one of the resolved-traversal targets. That file exists
on every Linux runner, so the guard would fail even when the production
code correctly raised ValueError before any I/O — what we saw in CI.
The security check (pytest.raises(ValueError, match='escapes parent'))
is unchanged; it's the only assertion that proves the fix works. The
post-condition guard is now scoped to canary basenames that cannot
pre-exist by accident, so it actually exercises 'rejected before write'
without depending on system-file absence.
---------
Co-authored-by: deacon-mp <mperry@mitre.org>diff --git a/app/service/file_svc.py b/app/service/file_svc.py
index ae032b4..c8d3196 100644
--- a/app/service/file_svc.py
+++ b/app/service/file_svc.py
@@ -74,7 +74,22 @@
async def save_file(self, filename, payload, target_dir, encrypt=True, encoding=None):
if encoding:
payload = await self._decode_contents(payload, encoding)
- self._save(os.path.join(target_dir, filename), payload, encrypt)
+ full_path = os.path.join(target_dir, filename)
+ # Containment check — the resolved final path must not escape its parent.
+ # save_file has two legitimate call modes:
+ # (1) target_dir + basename — sink mode, used by agent contact handlers
+ # (DNS, FTP, Gist, Slack) that take filename from upload metadata.
+ # (2) '' + relative path — path mode, used by ability/source managers
+ # that assemble paths from sanitised IDs.
+ # Either mode is broken by a filename containing '..'. Resolving the final
+ # path and checking it stays under the parent rejects both classes with one
+ # check, without needing callers to know which mode they are using.
+ parent = os.path.realpath(target_dir) if target_dir else os.path.realpath(os.getcwd())
+ final = os.path.realpath(full_path)
+ if final != parent and not final.startswith(parent + os.sep):
+ raise ValueError('save_file: path %r escapes parent %r (filename=%r, target_dir=%r)'
+ % (final, parent, filename, target_dir))
+ self._save(full_path, payload, encrypt)
async def create_exfil_sub_directory(self, dir_name):
path = os.path.join(self.get_config('exfil_dir'), dir_name)
diff --git a/tests/services/test_file_svc.py b/tests/services/test_file_svc.py
index 8871093..97080d8 100644
--- a/tests/services/test_file_svc.py
+++ b/tests/services/test_file_svc.py
@@ -45,6 +45,32 @@
with open(file_location, "r") as file_contents:
assert payload.decode("utf-8") == file_contents.read()
+ def test_save_file_rejects_path_traversal(self, event_loop, file_svc, tmp_path):
+ # save_file is reachable from the agent contact handlers (DNS, FTP, Gist,
+ # Slack) with an attacker-controlled filename. A '../../...' basename
+ # must NOT be allowed to escape target_dir, regardless of the encryption
+ # setting. Regression test for the unauthenticated file-write primitive
+ # that ended in pickle.loads on the next restart.
+ payload = b'attacker bytes'
+ # Avoid filesystem-resident paths like /etc/passwd as targets — the
+ # post-condition guard would false-positive on any pre-existing system
+ # file. Use names that cannot pre-exist by accident.
+ traversal_attempts = [
+ '../caldera-traversal-canary-1.bin',
+ '../../caldera-traversal-canary-2.bin',
+ '../../../../tmp/caldera-traversal-canary-3.bin',
+ ]
+ for evil in traversal_attempts:
+ with pytest.raises(ValueError, match='escapes parent'):
+ event_loop.run_until_complete(
+ file_svc.save_file(evil, payload, str(tmp_path), encrypt=False)
+ )
+ # Defense-in-depth: confirm the canary file the test name reserves
+ # wasn't actually written (proves save_file rejected BEFORE the I/O,
+ # not just that it raised after writing).
+ resolved = os.path.realpath(os.path.join(str(tmp_path), evil))
+ assert not os.path.exists(resolved), 'save_file wrote to %s before raising' % resolved
+
def test_create_exfil_sub_directory(self, event_loop, file_svc):
exfil_dir_name = 'unit-testing-Rocks'
new_dir = event_loop.run_until_complete(file_svc.create_exfil_sub_directory(exfil_dir_name))