security(file_svc): containment check on save_file to block agent-contact path traversal (#3380)

* security(file_svc): containment check on save_file to block path traversal

file_svc.save_file builds its target path as os.path.join(target_dir, filename)
and writes payload bytes there with no validation of the resulting location.
It is reached from the agent contact handlers with an attacker-controlled
filename:

  * app/contacts/contact_ftp.py:217  — FTP submit_uploaded_file
  * app/contacts/contact_dns.py:491  — DNS _submit_uploaded_file
  * app/contacts/contact_gist.py:212 — Gist contact upload
  * app/contacts/contact_slack.py:206 — Slack contact upload

A filename of e.g. '../../data/object_store' escapes the exfil tree and lets
an attacker write arbitrary bytes to any path the server has permission to
write. data_svc.restore_state pickle.loads('data/object_store') on the next
restart, which turns that write primitive into RCE on the team server. The
file is Fernet-encrypted with conf encryption_key, but with the documented
default encryption_key=ADMIN123 a remote attacker can forge a valid blob.

save_file has two legitimate call modes:
  (1) target_dir + basename — sink mode used by data_svc, app_svc, c_operation,
      base_knowledge_svc, and the four agent contact handlers above.
  (2) ''           + relative-path — path mode used by ability_api_manager,
      base_api_manager, and rest_svc, which assemble paths from sanitised IDs.

A simple "filename must be a basename" check breaks mode (2). Instead, resolve
the final path with realpath and require it to stay under its parent. This:

  * rejects '..' / absolute-path escape in BOTH call modes, with one check;
  * leaves all in-tree callers' happy paths intact (object_store, contact
    reports, operation logs, ability/source YAML writes);
  * provides defense-in-depth for any future caller, not just the four contact
    handlers known to be vulnerable today.

Includes a regression test that exercises three traversal vectors and
confirms each is rejected before any bytes are written.

Reported externally on 2026-05-18 alongside the broader audit pass.

* test(file_svc): fix path-traversal regression test guard

The post-condition assertion 'not os.path.exists(realpath(...))' used
'/etc/passwd' as one of the resolved-traversal targets. That file exists
on every Linux runner, so the guard would fail even when the production
code correctly raised ValueError before any I/O — what we saw in CI.

The security check (pytest.raises(ValueError, match='escapes parent'))
is unchanged; it's the only assertion that proves the fix works. The
post-condition guard is now scoped to canary basenames that cannot
pre-exist by accident, so it actually exercises 'rejected before write'
without depending on system-file absence.

---------

Co-authored-by: deacon-mp <mperry@mitre.org>
diff --git a/app/service/file_svc.py b/app/service/file_svc.py
index ae032b4..c8d3196 100644
--- a/app/service/file_svc.py
+++ b/app/service/file_svc.py
@@ -74,7 +74,22 @@
     async def save_file(self, filename, payload, target_dir, encrypt=True, encoding=None):
         if encoding:
             payload = await self._decode_contents(payload, encoding)
-        self._save(os.path.join(target_dir, filename), payload, encrypt)
+        full_path = os.path.join(target_dir, filename)
+        # Containment check — the resolved final path must not escape its parent.
+        # save_file has two legitimate call modes:
+        #   (1) target_dir + basename — sink mode, used by agent contact handlers
+        #       (DNS, FTP, Gist, Slack) that take filename from upload metadata.
+        #   (2) ''  + relative path  — path mode, used by ability/source managers
+        #       that assemble paths from sanitised IDs.
+        # Either mode is broken by a filename containing '..'. Resolving the final
+        # path and checking it stays under the parent rejects both classes with one
+        # check, without needing callers to know which mode they are using.
+        parent = os.path.realpath(target_dir) if target_dir else os.path.realpath(os.getcwd())
+        final = os.path.realpath(full_path)
+        if final != parent and not final.startswith(parent + os.sep):
+            raise ValueError('save_file: path %r escapes parent %r (filename=%r, target_dir=%r)'
+                             % (final, parent, filename, target_dir))
+        self._save(full_path, payload, encrypt)
 
     async def create_exfil_sub_directory(self, dir_name):
         path = os.path.join(self.get_config('exfil_dir'), dir_name)
diff --git a/tests/services/test_file_svc.py b/tests/services/test_file_svc.py
index 8871093..97080d8 100644
--- a/tests/services/test_file_svc.py
+++ b/tests/services/test_file_svc.py
@@ -45,6 +45,32 @@
         with open(file_location, "r") as file_contents:
             assert payload.decode("utf-8") == file_contents.read()
 
+    def test_save_file_rejects_path_traversal(self, event_loop, file_svc, tmp_path):
+        # save_file is reachable from the agent contact handlers (DNS, FTP, Gist,
+        # Slack) with an attacker-controlled filename. A '../../...' basename
+        # must NOT be allowed to escape target_dir, regardless of the encryption
+        # setting. Regression test for the unauthenticated file-write primitive
+        # that ended in pickle.loads on the next restart.
+        payload = b'attacker bytes'
+        # Avoid filesystem-resident paths like /etc/passwd as targets — the
+        # post-condition guard would false-positive on any pre-existing system
+        # file. Use names that cannot pre-exist by accident.
+        traversal_attempts = [
+            '../caldera-traversal-canary-1.bin',
+            '../../caldera-traversal-canary-2.bin',
+            '../../../../tmp/caldera-traversal-canary-3.bin',
+        ]
+        for evil in traversal_attempts:
+            with pytest.raises(ValueError, match='escapes parent'):
+                event_loop.run_until_complete(
+                    file_svc.save_file(evil, payload, str(tmp_path), encrypt=False)
+                )
+            # Defense-in-depth: confirm the canary file the test name reserves
+            # wasn't actually written (proves save_file rejected BEFORE the I/O,
+            # not just that it raised after writing).
+            resolved = os.path.realpath(os.path.join(str(tmp_path), evil))
+            assert not os.path.exists(resolved), 'save_file wrote to %s before raising' % resolved
+
     def test_create_exfil_sub_directory(self, event_loop, file_svc):
         exfil_dir_name = 'unit-testing-Rocks'
         new_dir = event_loop.run_until_complete(file_svc.create_exfil_sub_directory(exfil_dir_name))