blob: 4b221ebb282cd3c4f1804363bf8215bfad711745 [file]
# asvs_bundle
#
# Audits multiple ASVS requirements against a SHARED file scope in a single
# Opus deep-analysis call, instead of N independent calls (one per section)
# all re-reading the same code.
#
# This is the T4 win from optimization-plan.md. When discovery groups
# sections into a "pass" sharing the same files, calling this agent once
# replaces N separate asvs_audit calls.
#
# Returns a JSON envelope:
# {
# "mode": "bundled",
# "asvs_sections": ["5.1.1", "5.1.2", ...],
# "per_section": {
# "5.1.1": {
# "report": "<markdown>",
# "findings": {"Critical": N, "High": N, "Medium": N, "Low": N},
# "files_analyzed": N, "files_total": N, "files_skipped": N
# },
# ...
# },
# "raw_consolidated": "<full markdown before splitting>"
# }
#
# The orchestrator splits per_section[*].report into individual files for
# pushing to GitHub, while asvs_consolidate still sees one report per
# section as before.
#
# Optimizations applied within this agent (cross-ref optimization-plan.md):
# T2 — opus_semaphore raised from 2 to 4 (env-overridable)
# T5 — inventory cache keyed by file-set hash, not by ASVS section
# T8 — single-pass consolidation when ≤4 batch results
# T9 — Step 2 (relevance) uses Haiku 4.5 instead of Sonnet
#
# Input:
# {
# "namespaces": ["files:owner/repo"],
# "asvs_sections": ["5.1.1", "5.1.2", "5.1.3"], # required, must be a list
# "includeFiles": ["src/auth/**", ...], # optional
# "domainContext": "...", # optional
# "severityThreshold": "MEDIUM", # optional
# "falsePositiveGuidance": ["..."] # optional
# }
from agent_factory.remote_mcp_client import RemoteMCPClient
from services.llm_service import call_llm
import httpx
async def run(input_dict, tools):
mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
http_client = httpx.AsyncClient()
try:
def _short_asvs_summary(desc, max_chars=300):
"""Extract a clean one-line summary from the multi-line ASVS context.
The asvs_description string is built by joining several labeled lines
(`Description:`, `Level:`, `Section:`, `Section Description:`). For the
Executive Summary we only want the `Description:` line, truncated at a
word boundary so we don't end mid-word.
"""
if not desc:
return ""
for line in str(desc).split("\n"):
if line.startswith("Description: "):
text = line[len("Description: "):].strip()
if len(text) <= max_chars:
return text
cutoff = text.rfind(" ", 0, max_chars)
if cutoff < 0:
cutoff = max_chars
return text[:cutoff].rstrip(",.;:") + "\u2026"
# Fallback: first non-blank line
for line in str(desc).split("\n"):
line = line.strip()
if line:
if len(line) <= max_chars:
return line
cutoff = line.rfind(" ", 0, max_chars)
if cutoff < 0:
cutoff = max_chars
return line[:cutoff].rstrip(",.;:") + "\u2026"
return ""
def _count_findings(content):
"""Count findings by severity in a report body.
Strategy: Finding ID format is the strongest, most consistent signal
across model outputs (`ASVS-{section}-{SEV}-NNN`). We extract those
and classify by severity token. Falls back to severity headings or
inline `**Severity**:` lines if no IDs are present.
Handles all observed formats:
- `#### MEDIUM` / `### [HIGH]` / `## Critical` (any heading depth)
- `**Finding ID:** ASVS-221-MED-001` (CRIT/CRITICAL, MED/MEDIUM)
- `**Severity:** Medium` / `**Severity**: High`
"""
import re
counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Info": 0}
# Primary: count Finding IDs and classify each by its severity token
finding_ids = re.findall(
r'ASVS-\d+-(CRIT(?:ICAL)?|HIGH|MED(?:IUM)?|LOW|INFO(?:RMATIONAL)?)-\d+',
content, re.IGNORECASE,
)
for sev_token in finding_ids:
token_upper = sev_token.upper()
if token_upper.startswith("CRIT"):
counts["Critical"] += 1
elif token_upper == "HIGH":
counts["High"] += 1
elif token_upper.startswith("MED"):
counts["Medium"] += 1
elif token_upper == "LOW":
counts["Low"] += 1
elif token_upper.startswith("INFO"):
counts["Info"] += 1
if sum(counts.values()) > 0:
return counts
# Fallback 1: severity headings at any heading depth, with or without brackets
for sev_name, key in [("critical", "Critical"), ("high", "High"),
("medium", "Medium"), ("low", "Low"),
("info", "Info"), ("informational", "Info")]:
pattern = rf'(?im)^#{{1,6}}\s*\[?\s*{sev_name}\s*\]?\s*$'
counts[key] += len(re.findall(pattern, content))
if sum(counts.values()) > 0:
return counts
# Fallback 2: inline **Severity**: X lines
counts["Critical"] += len(re.findall(r'\*\*Severity:?\*\*:?\s*Critical', content, re.IGNORECASE))
counts["High"] += len(re.findall(r'\*\*Severity:?\*\*:?\s*High', content, re.IGNORECASE))
counts["Medium"] += len(re.findall(r'\*\*Severity:?\*\*:?\s*Medium', content, re.IGNORECASE))
counts["Low"] += len(re.findall(r'\*\*Severity:?\*\*:?\s*Low', content, re.IGNORECASE))
counts["Info"] += len(re.findall(r'\*\*Severity:?\*\*:?\s*Info(?:rmational)?', content, re.IGNORECASE))
return counts
def _split_bundled_output(consolidated_analysis, asvs_sections, asvs_descriptions,
repo_name, audit_date, n_relevant, n_total, skipped):
"""Split bundled Opus output into per-section reports.
Opus is instructed to produce `## ASVS-{section}:` headers per section.
We split on those, attach the cross-cutting tail to each section's report,
and produce a fully-formed markdown report per section.
"""
import re
per_section = {}
tail_match = re.search(
r'(##\s*Cross-cutting Architecture Observations[\s\S]*)',
consolidated_analysis,
)
cross_cutting_tail = tail_match.group(1) if tail_match else ""
body = consolidated_analysis
if tail_match:
body = consolidated_analysis[:tail_match.start()]
section_pattern = re.compile(
r'##\s*ASVS-(\d+(?:\.\d+)*)[:\s][^\n]*\n([\s\S]*?)(?=##\s*ASVS-\d|\Z)',
re.MULTILINE,
)
found_sections = {}
for m in section_pattern.finditer(body):
sid = m.group(1)
block = m.group(0)
found_sections[sid] = block
_bundle_label = asvs_sections[0] if asvs_sections else "?"
for sid in asvs_sections:
section_body = found_sections.get(sid)
if section_body is None:
print(f"[bundle {_bundle_label}] WARNING: no bundled output for section {sid} — emitting empty report", flush=True)
section_body = (
f"## ASVS-{sid}\n\n"
f"_No findings produced by the bundled analysis. This may indicate "
f"the section is not applicable to the audited file scope._\n"
)
findings_count = _count_findings(section_body)
report = _format_section_report(
sid, asvs_descriptions.get(sid, f"ASVS Requirement {sid}"),
repo_name, audit_date, n_relevant, n_total, skipped,
findings_count, section_body, cross_cutting_tail,
)
per_section[sid] = {
"report": report,
"findings": findings_count,
"files_analyzed": n_relevant,
"files_total": n_total,
"files_skipped": skipped,
}
return per_section
def _format_section_report(asvs, asvs_description, repo_name, audit_date,
n_relevant, n_total, skipped,
findings_count, body, cross_cutting_tail):
return f"""# Security Audit Report: ASVS {asvs}
## Executive Summary
| Field | Value |
|-------|-------|
| **Repository** | {repo_name} |
| **Audit Date** | {audit_date} |
| **Auditor** | Tooling Agents (bundled-pass mode) |
| **ASVS Requirement** | ASVS {asvs} |
| **Files Analyzed** | {n_relevant} relevant / {n_total} total |
| **Files Skipped** | {skipped} |
**Requirement description:** {_short_asvs_summary(asvs_description)}
### Findings Overview
| Severity | Count |
|----------|-------|
| 🔴 Critical | {findings_count['Critical']} |
| 🟠 High | {findings_count['High']} |
| 🟡 Medium | {findings_count['Medium']} |
| 🟢 Low | {findings_count['Low']} |
---
{body}
{cross_cutting_tail}
"""
def _empty_section_report(asvs, asvs_description, repo_name, audit_date,
n_relevant, n_total, skipped):
return f"""# Security Audit Report: ASVS {asvs}
## Executive Summary
| Field | Value |
|-------|-------|
| **Repository** | {repo_name} |
| **Audit Date** | {audit_date} |
| **Auditor** | Tooling Agents (bundled-pass mode) |
| **ASVS Requirement** | ASVS {asvs} |
| **Files Analyzed** | {n_relevant} relevant / {n_total} total |
| **Files Skipped** | {skipped} |
**Requirement description:** {_short_asvs_summary(asvs_description)}
### Findings Overview
| Severity | Count |
|----------|-------|
| 🔴 Critical | 0 |
| 🟠 High | 0 |
| 🟡 Medium | 0 |
| 🟢 Low | 0 |
## Result
No relevant files found for ASVS requirement {asvs} within the audited scope.
"""
async def _single_pass_consolidate(results, asvs_description, provider, model, params):
"""T8: One Sonnet call merges ≤4 batch results — no multi-round loop."""
prompt = f"""You are consolidating multiple security audit batch results into a single unified analysis.
ASVS Requirements being audited:
{asvs_description}
NOTE: This is a bundled multi-section audit. The output MUST preserve the
per-section structure (## ASVS-{{section}}: ... headers). Do NOT merge findings
across different ASVS sections — only deduplicate WITHIN a section.
## Consolidation Rules:
1. DEDUPLICATE — Within each section, merge findings describing the same vulnerability
2. CHECK CONTRADICTIONS — If something appears as a positive pattern in ANY batch, remove from findings
3. VERIFY DATA ORIGINS — Remove findings where source is database/config without user injection path
4. CONSISTENT SEVERITY — Ensure similar issues have the same severity
5. REMOVE OUT-OF-SCOPE — Exclude test files, dev scripts
6. PRESERVE SPECIFICS — Keep all exact code references, line numbers, and technical details
Consolidate these analysis results into a single report with the per-section structure intact.
BATCH RESULTS TO CONSOLIDATE:
""" + "\n---\n".join(results)
messages = [{"role": "user", "content": prompt}]
consolidation_params = {**params, "max_tokens": 32000}
try:
resp, _ = await call_llm(
provider=provider, model=model,
messages=messages, parameters=consolidation_params, timeout=600,
)
return resp
except Exception as e:
# call_llm has exhausted its centralized retries. Fall
# back to raw-joining the batch results rather than
# losing them entirely — the consolidated structure is
# degraded but the findings are preserved for downstream
# processing.
print(f" Single-pass consolidation failed, joining raw: {e}", flush=True)
return "\n\n---\n\n".join(results)
async def _multi_round_consolidate(results, asvs_description, provider, model, params, context_window):
"""Original multi-round behavior, kicks in only for >4 batch results."""
template = f"""You are consolidating multiple security audit batch results into a single unified analysis.
ASVS Requirements:
{asvs_description}
NOTE: This is a bundled multi-section audit. Preserve per-section structure
(## ASVS-{{section}}: ... headers). Deduplicate WITHIN sections only.
## Consolidation Rules:
1. DEDUPLICATE - Merge findings describing the same vulnerability within a section
2. CHECK CONTRADICTIONS - If something appears as a positive pattern in ANY batch, remove from findings
3. VERIFY DATA ORIGINS - Remove findings where source is database/config without user injection path
4. CONSISTENT SEVERITY - Ensure similar issues have same severity
5. REMOVE OUT-OF-SCOPE - Exclude test files, dev scripts
6. VERIFY COMPLETENESS - For each vulnerability, confirm related functions were checked
BATCH RESULTS TO CONSOLIDATE:
"""
consolidation_params = {**params, "max_tokens": 16384}
template_tokens = count_tokens(template, provider, model)
max_cons_content = int(context_window * 0.40) - template_tokens
consolidation_round = 0
MAX_ROUNDS = 5
batch_results = list(results)
while len(batch_results) > 1:
consolidation_round += 1
prev_count = len(batch_results)
next_level = []
group = []
group_tokens = 0
for result in batch_results:
result_tokens = count_tokens(result, provider, model)
if group and (group_tokens + result_tokens) > max_cons_content:
merged = await _try_consolidate(template, group, provider, model, consolidation_params)
if merged is None:
next_level.extend(group)
else:
next_level.append(merged)
group = []
group_tokens = 0
group.append(result)
group_tokens += result_tokens
if group:
if len(group) == 1 and not next_level:
next_level.append(group[0])
else:
merged = await _try_consolidate(template, group, provider, model, consolidation_params)
if merged is None:
next_level.extend(group)
else:
next_level.append(merged)
print(f" Round {consolidation_round}: {prev_count} -> {len(next_level)}", flush=True)
batch_results = next_level
if len(batch_results) >= prev_count:
print(f" No progress, stopping", flush=True)
break
if consolidation_round >= MAX_ROUNDS:
print(f" Max rounds reached", flush=True)
break
return "\n\n---\n\n".join(batch_results)
async def _try_consolidate(template, group, provider, model, params):
prompt = template + "\n---\n".join(group)
try:
resp, _ = await call_llm(
provider=provider, model=model,
messages=[{"role": "user", "content": prompt}],
parameters=params, timeout=300,
)
return resp
except Exception:
# Returning None signals the caller to fall back to its
# raw-join behavior. call_llm has already done its
# central backoff and failed; immediate further retry
# here would be redundant.
return None
import os
import json
import re
import fnmatch
import hashlib
from datetime import date
audit_date = date.today().strftime("%b %d, %Y")
# =============================================================
# Parse input
# =============================================================
input_text = input_dict.get("inputText", "")
params = {}
if input_text.strip().startswith('{'):
try:
params = json.loads(input_text)
except json.JSONDecodeError:
pass
if not params.get('namespace') and not params.get('namespaces'):
match = re.search(r'(?:namespaces?|ns)[:\s]+([^\n]+?)(?:\s+asvs|$)', input_text, re.IGNORECASE)
if match:
raw = match.group(1).strip()
if ',' in raw:
params['namespaces'] = [n.strip() for n in raw.split(',')]
else:
params['namespace'] = raw
else:
match = re.search(r'(?:namespace|ns)[:\s]+([^\s,]+)', input_text, re.IGNORECASE)
if match:
params['namespace'] = match.group(1)
namespaces = params.get('namespaces') or ([params.get('namespace')] if params.get('namespace') else [])
asvs_sections = params.get('asvs_sections') or []
if isinstance(asvs_sections, str):
asvs_sections = [s.strip() for s in asvs_sections.split(',') if s.strip()]
if not asvs_sections and params.get('asvs'):
# Tolerate single-section call but warn
asvs_sections = [params['asvs']]
include_files = params.get('includeFiles', [])
severity_threshold = params.get('severityThreshold', '')
domain_context = params.get('domainContext', '')
false_positive_guidance = params.get('falsePositiveGuidance', [])
if not namespaces:
all_ns = data_store.list_namespaces()
file_ns = [ns for ns in all_ns if ns.startswith("files:")]
if file_ns:
namespaces = file_ns
if not namespaces:
return {"outputText": json.dumps({
"error": f"No namespaces provided. Available: {data_store.list_namespaces()}"
})}
if not asvs_sections:
return {"outputText": json.dumps({
"error": "No ASVS sections specified. Provide `asvs_sections` as a list."
})}
repo_name = "unknown"
for ns in namespaces:
if ns.startswith("files:"):
repo_name = ns.replace("files:", "")
break
# Single identifier line per bundle — first section serves as a
# short label, and the full section list shows what's in flight.
# Other startup info (namespaces, file scope, severity threshold)
# are constant across all bundles in a run; the orchestrator
# already prints them once. Don't repeat per-bundle.
bundle_label = asvs_sections[0] if asvs_sections else "?"
print(f"[bundle {bundle_label}] sections: {asvs_sections}", flush=True)
if len(asvs_sections) == 1:
print(f"[bundle {bundle_label}] WARNING: called with 1 section. "
f"For single-section audits prefer asvs_audit directly.", flush=True)
# =============================================================
# Model configuration
# =============================================================
SONNET_PROVIDER = "bedrock"
SONNET_MODEL = "us.anthropic.claude-sonnet-4-5-20250929-v1:0"
SONNET_PARAMS = {"temperature": 0.7, "max_tokens": 16384}
# T9: Haiku for relevance filtering — cheaper and faster
HAIKU_PROVIDER = "bedrock"
HAIKU_MODEL = "us.anthropic.claude-haiku-4-5-20251001-v1:0"
HAIKU_PARAMS = {"temperature": 0.3, "max_tokens": 8192}
OPUS_PROVIDER = "bedrock"
OPUS_MODEL = "us.anthropic.claude-opus-4-6-v1"
# Bundled output is bigger — give Opus more room to write per-section blocks
OPUS_PARAMS = {"temperature": 1, "reasoning_effort": "high", "max_tokens": 96000}
SONNET_CONTEXT = get_context_window(SONNET_PROVIDER, SONNET_MODEL)
HAIKU_CONTEXT = get_context_window(HAIKU_PROVIDER, HAIKU_MODEL)
OPUS_CONTEXT = get_context_window(OPUS_PROVIDER, OPUS_MODEL)
# T2: configurable concurrency
OPUS_CONCURRENCY = int(os.environ.get("OPUS_CONCURRENCY", "4"))
SONNET_CONCURRENCY = int(os.environ.get("SONNET_CONCURRENCY", "5"))
sonnet_semaphore = asyncio.Semaphore(SONNET_CONCURRENCY)
opus_semaphore = asyncio.Semaphore(OPUS_CONCURRENCY)
# Cache key uses ALL bundled sections so re-runs with same bundle hit cache
bundle_key = "+".join(sorted(asvs_sections))
cache_key_prefix = f"bundle-{bundle_key}-{'-'.join(namespaces)}"
relevance_cache_ns = data_store.use_namespace(f"audit-cache:relevance:{cache_key_prefix}")
analysis_cache_ns = data_store.use_namespace(f"audit-cache:analysis:{cache_key_prefix}")
# =============================================================
# Step 0: Load ASVS context for ALL bundled sections
# =============================================================
asvs_descriptions = {} # section_id -> description string
try:
asvs_ns = data_store.use_namespace("asvs")
for section_id in asvs_sections:
req = asvs_ns.get(f"asvs:requirements:{section_id}")
if req:
parts = [f"ASVS Requirement {section_id}"]
if req.get("req_description"):
parts.append(f"Description: {req['req_description']}")
if req.get("level"):
parts.append(f"Level: {req['level']}")
sec_id = req.get("section_id", "")
if sec_id:
sec = asvs_ns.get(f"asvs:sections:{sec_id}")
if sec:
parts.append(f"Section: {sec.get('section_name', '')}")
if sec.get("description"):
parts.append(f"Section Description: {sec['description']}")
asvs_descriptions[section_id] = "\n".join(parts)
else:
asvs_descriptions[section_id] = f"ASVS Requirement {section_id}"
except Exception as e:
print(f"[bundle {bundle_label}] WARNING: Could not load ASVS requirements: {e}", flush=True)
for section_id in asvs_sections:
asvs_descriptions[section_id] = f"ASVS Requirement {section_id}"
combined_asvs_description = "\n\n".join(
f"### Requirement {sid}\n{desc}" for sid, desc in asvs_descriptions.items()
)
# =============================================================
# Step 1: Read & filter files
# =============================================================
# The orchestrator's contract: namespaces[0] is the primary
# source-code namespace and is subject to include_files
# filtering. Subsequent namespaces (from supplementalData) are
# supplemental — guidance docs, threat models, vendored libs,
# config overlays, related-repo code — and should NOT be filtered
# by patterns that were generated for the source code. They load
# fully so the model sees them in every Opus call regardless of
# how discovery scoped the source files.
#
# Within supplemental namespaces we distinguish TWO kinds by
# namespace prefix:
# - "audit_guidance:*" → AUTHORITATIVE GUIDANCE
# Documents that calibrate which findings are real vs. by-
# design (project AGENTS.md, security_model.rst, etc.).
# Rendered later in a dedicated "Project Security Guidance
# (Authoritative)" prompt section, NOT as source files.
# - any other supplemental namespace → SUPPLEMENTAL CODE
# Vendored libraries, config files, related-repo overlays.
# Rendered as source code in the prompt and audited normally.
#
# Both kinds bypass the include_files / SKIP / relevance filters
# (operator opted them in explicitly). The distinction is purely
# how they appear in the final Opus prompt.
all_files = {}
supplemental_keys = set() # all non-primary keys (filter-exempt)
guidance_keys = set() # subset of supplemental from audit_guidance:* namespaces
primary_file_count = 0
for idx, ns in enumerate(namespaces):
is_primary = (idx == 0)
is_guidance = (not is_primary) and ns.startswith("audit_guidance:")
ns_store = data_store.use_namespace(ns)
keys = ns_store.list_keys()
if is_primary and include_files:
pre_filter_count = len(keys)
filtered_keys = [k for k in keys if any(
fnmatch.fnmatch(k, pattern) for pattern in include_files
)]
if not filtered_keys and pre_filter_count > 0:
# Discovery emitted include_files patterns that match
# zero keys in this namespace. Causes: hallucinated
# paths from Sonnet, wrong path prefix, fnmatch's
# `**` quirk, or repo-layout drift since discovery
# last ran. Fall back to the unfiltered key list
# rather than aborting with "No files found" and
# emitting empty per-section stubs — the audit will
# cost more tokens but actually produce findings.
print(f" [bundle] namespace '{ns}' (primary): "
f"include_files matched 0 of {pre_filter_count} "
f"keys — FALLING BACK to unfiltered. Bad "
f"discovery patterns (first 5):", flush=True)
for p in include_files[:5]:
print(f" - {p!r}", flush=True)
# `keys` left as-is (unfiltered)
else:
keys = filtered_keys
print(f" [bundle] namespace '{ns}' (primary): "
f"{len(keys)} keys after include_files filter", flush=True)
else:
if is_primary:
scope = "primary"
elif is_guidance:
scope = "supplemental-guidance"
else:
scope = "supplemental-code"
print(f" [bundle] namespace '{ns}' ({scope}): "
f"{len(keys)} keys (no filter)", flush=True)
file_contents = ns_store.get_many(keys) if keys else {}
for k, v in file_contents.items():
if v is not None:
content = v if isinstance(v, str) else json.dumps(v, default=str) if v else ""
all_files[k] = content
if is_primary:
primary_file_count += 1
else:
supplemental_keys.add(k)
if is_guidance:
guidance_keys.add(k)
# Guard checks the primary count specifically. An audit that
# has zero source files but loaded supplemental docs is still
# a degenerate case — the audit's job is to look at code, not
# documentation. Error out the same way as before.
# Error message text preserved verbatim so existing log/
# inspection tooling that pattern-matches on it (notably
# inspect_audit_findings.py looking for the "No files found
# in namespaces" substring) keeps working.
if primary_file_count == 0:
return {"outputText": json.dumps({
"error": f"No files found in namespaces {namespaces}"
})}
SKIP_DIRS = {
'node_modules', 'vendor', 'third_party', 'third-party',
'dist', 'build', 'out', 'target',
'__pycache__', '.pytest_cache', '.mypy_cache', 'coverage', '.next', '.nuxt',
'assets', 'images', 'img', 'static/images', 'static/fonts', 'static/webfonts',
'public/images', 'fonts', 'webfonts',
'.github/workflows',
'venv', '.venv', 'env', '.env',
'.git', '.idea', '.vscode',
}
SKIP_FILES = {
'package-lock.json', 'yarn.lock', 'poetry.lock', 'Cargo.lock',
'composer.lock', 'pnpm-lock.yaml', 'Gemfile.lock', 'uv.lock',
'LICENSE', 'LICENSE.md', 'LICENSE.txt',
'README.md', 'README.rst', 'README.txt', 'README',
'CHANGELOG.md', 'CHANGELOG', 'CONTRIBUTING.md', 'CODE_OF_CONDUCT.md',
'.gitignore', '.dockerignore', '.prettierrc', '.eslintrc', '.editorconfig',
'.npmrc', '.yarnrc',
}
SKIP_EXTENSIONS = {
'.min.js', '.min.css', '.bundle.js', '.bundle.css',
'.map', '.css', '.scss', '.less', '.sass', '.styl',
'.woff', '.woff2', '.ttf', '.eot', '.otf',
'.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.webp', '.bmp',
'.mp3', '.mp4', '.wav', '.avi', '.mov', '.webm', '.ogg',
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
'.zip', '.tar', '.gz', '.rar', '.7z', '.lock',
'.exe', '.dll', '.so', '.dylib', '.pyc', '.pyo', '.class', '.o', '.obj',
}
def should_skip_file(filepath):
path_lower = filepath.lower()
parts = filepath.split('/')
for part in parts[:-1]:
if part.lower() in SKIP_DIRS:
return True
filename = parts[-1] if parts else ''
if filename in SKIP_FILES:
return True
for ext in SKIP_EXTENSIONS:
if path_lower.endswith(ext):
return True
return False
filtered_files = {}
skipped_count = 0
for key, content in all_files.items():
# Supplemental files (from supplementalData namespaces) bypass
# the skip rules entirely. The user explicitly opted in to
# including them, and the SKIP_FILES list contains things
# like README.md and CONTRIBUTING.md that are legitimate
# guidance-doc names — they'd be dropped otherwise.
if key in supplemental_keys:
filtered_files[key] = content
continue
if should_skip_file(key):
skipped_count += 1
continue
filtered_files[key] = content
if not filtered_files:
return {"outputText": json.dumps({
"error": f"All {len(all_files)} files filtered out by skip rules"
})}
# T5: file-set-hash inventory cache key
file_set_hash = hashlib.sha256(
"\n".join(sorted(filtered_files.keys())).encode()
).hexdigest()[:16]
inventory_cache_ns = data_store.use_namespace(f"audit-cache:inventory:{file_set_hash}")
# =============================================================
# Step 2: Relevance filtering (Haiku) [T9]
# =============================================================
if include_files:
relevant_files = filtered_files
relevance_scores = {path: 10 for path in filtered_files}
sorted_relevant = sorted(relevant_files.keys())
else:
cached_relevance = relevance_cache_ns.get("scores")
if cached_relevance:
relevance_scores = cached_relevance
else:
file_previews = {}
for path, content in filtered_files.items():
# Supplemental files are out-of-scope for relevance
# scoring — they're project guidance, not auditable
# code. Don't waste Haiku tokens rating them and
# don't let a low score drop them downstream; they
# get force-included below regardless.
if path in supplemental_keys:
continue
lines = content.split('\n')
file_previews[path] = '\n'.join(lines[:200])
SAFE_HAIKU_LIMIT = int(HAIKU_CONTEXT * 0.40)
relevance_prompt_template = f"""You are a security auditor performing file relevance filtering.
ASVS Requirements being audited (a file may be relevant to ANY of them):
{combined_asvs_description}
Below are file paths with previews (first ~200 lines) from a codebase.
Rate each file's relevance to ANY of the ASVS requirements above on a scale of 0-10:
- 10: Directly implements or should implement controls for these requirements
- 7-9: Contains related security controls, data handling, or configuration
- 4-6: May contain relevant patterns indirectly
- 1-3: Unlikely to be relevant
- 0: Definitely not relevant
Return ONLY a JSON object mapping file paths to relevance scores (integer 0-10).
Example: {{"src/auth.py": 9, "src/utils.py": 3}}
FILES TO EVALUATE:
"""
template_tokens = count_tokens(relevance_prompt_template, HAIKU_PROVIDER, HAIKU_MODEL)
preview_budget = SAFE_HAIKU_LIMIT - template_tokens
preview_batches = []
current_batch = {}
current_tokens = 0
for path, preview in file_previews.items():
entry = f"\n--- {path} ---\n{preview}\n"
entry_tokens = count_tokens(entry, HAIKU_PROVIDER, HAIKU_MODEL)
if current_tokens + entry_tokens > preview_budget and current_batch:
preview_batches.append(current_batch)
current_batch = {}
current_tokens = 0
current_batch[path] = entry
current_tokens += entry_tokens
if current_batch:
preview_batches.append(current_batch)
relevance_scores = {}
async def filter_batch(i, batch):
async with sonnet_semaphore:
entries_text = "".join(batch.values())
prompt = relevance_prompt_template + entries_text
messages = [{"role": "user", "content": prompt}]
# call_llm handles retries with exponential backoff.
# If we get here with an exception, retries have been
# exhausted (or it's a non-retryable error like a bad
# JSON response). Fall back to score=5 for every
# file in the batch — the audit will include them
# all rather than losing them silently.
try:
content_resp, _ = await call_llm(
provider=HAIKU_PROVIDER, model=HAIKU_MODEL,
messages=messages, parameters=HAIKU_PARAMS,
timeout=120,
)
json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', content_resp, re.DOTALL)
if json_match:
scores = json.loads(json_match.group())
return scores
except Exception as e:
print(f"[bundle {bundle_label}] relevance batch {i+1} FAILED: {e}", flush=True)
print(f"[bundle {bundle_label}] WARNING: relevance batch {i+1} defaulting {len(batch)} files to score=5", flush=True)
return {path: 5 for path in batch}
batch_results = await asyncio.gather(*[
filter_batch(i, batch)
for i, batch in enumerate(preview_batches)
])
for scores in batch_results:
relevance_scores.update(scores)
relevance_cache_ns.set("scores", relevance_scores)
relevant_files = {}
for path, content in filtered_files.items():
# Supplemental files always pass the relevance gate.
# They weren't scored by Haiku (see exclusion above)
# and represent intentionally included context.
if path in supplemental_keys:
relevant_files[path] = content
continue
score = relevance_scores.get(path, 5)
if isinstance(score, (int, float)) and score >= 4:
relevant_files[path] = content
if len(relevant_files) < 3 and filtered_files:
for path, content in filtered_files.items():
if path in supplemental_keys:
continue # already included above
score = relevance_scores.get(path, 5)
if isinstance(score, (int, float)) and score >= 2:
relevant_files[path] = content
sorted_relevant = sorted(relevant_files.keys(),
key=lambda p: relevance_scores.get(p, 0),
reverse=True)
if not relevant_files:
return {"outputText": json.dumps({
"mode": "bundled",
"asvs_sections": asvs_sections,
"per_section": {sid: {
"report": _empty_section_report(sid, asvs_descriptions.get(sid, ""),
repo_name, audit_date,
0, len(filtered_files), skipped_count),
"findings": {"Critical": 0, "High": 0, "Medium": 0, "Low": 0},
"files_analyzed": 0,
"files_total": len(filtered_files),
"files_skipped": skipped_count,
} for sid in asvs_sections},
"raw_consolidated": "",
}, default=str)}
# =============================================================
# Step 3: Code inventory (Sonnet, file-set-hash cached) [T5]
# =============================================================
cached_inventory = inventory_cache_ns.get("result")
if cached_inventory:
code_inventory = cached_inventory
else:
inventory_prompt_template = """You are a security code analyst. Extract a structured code inventory from each file below.
For each file, produce:
1. **Imports** — list all imports (especially security-related: auth, crypto, validators, sanitizers, path handling)
2. **Classes** — name, base classes, key methods with signatures
3. **Functions** — name, parameters (with types if available), decorators, line numbers (approximate)
4. **Security-relevant patterns** — validators, auth checks, path operations, file operations, crypto operations
5. **Routes/endpoints** — any URL routing, decorators like @app.route, @router.get, etc.
6. **Configuration** — security-relevant config values, environment variables
Return the inventory as structured markdown. Be thorough but concise.
FILES:
"""
SAFE_SONNET_LIMIT_INV = int(SONNET_CONTEXT * 0.40)
inv_template_tokens = count_tokens(inventory_prompt_template, SONNET_PROVIDER, SONNET_MODEL)
inv_budget = SAFE_SONNET_LIMIT_INV - inv_template_tokens
inv_batches = []
current_batch = {}
current_tokens = 0
for path in sorted_relevant:
content = relevant_files[path]
entry = f"\n--- {path} ---\n{content}\n"
entry_tokens = count_tokens(entry, SONNET_PROVIDER, SONNET_MODEL)
if entry_tokens > inv_budget:
if current_batch:
inv_batches.append(current_batch)
current_batch = {}
current_tokens = 0
inv_batches.append({path: entry})
continue
if current_tokens + entry_tokens > inv_budget and current_batch:
inv_batches.append(current_batch)
current_batch = {}
current_tokens = 0
current_batch[path] = entry
current_tokens += entry_tokens
if current_batch:
inv_batches.append(current_batch)
async def inventory_batch(i, batch):
async with sonnet_semaphore:
entries_text = "".join(batch.values())
prompt = inventory_prompt_template + entries_text
messages = [{"role": "user", "content": prompt}]
msg_tokens = count_message_tokens(messages, SONNET_PROVIDER, SONNET_MODEL)
if msg_tokens > int(SONNET_CONTEXT * 0.80):
items = list(batch.items())
mid = len(items) // 2
results = []
for sub_items in [items[:mid], items[mid:]]:
sub_text = "".join([v for _, v in sub_items])
sub_messages = [{"role": "user", "content": inventory_prompt_template + sub_text}]
try:
resp, _ = await call_llm(
provider=SONNET_PROVIDER, model=SONNET_MODEL,
messages=sub_messages, parameters=SONNET_PARAMS,
timeout=300,
)
results.append(resp)
except Exception as e:
print(f"[bundle {bundle_label}] inventory sub-batch failed: {e}", flush=True)
return "\n\n".join(results)
# Retries handled centrally in call_llm. On failure
# the inventory entry for this batch is dropped; the
# caller filters empty results before joining.
try:
resp, _ = await call_llm(
provider=SONNET_PROVIDER, model=SONNET_MODEL,
messages=messages, parameters=SONNET_PARAMS,
timeout=300,
)
return resp
except Exception as e:
print(f"[bundle {bundle_label}] inventory batch {i+1} FAILED: {e}", flush=True)
return ""
inventory_results = await asyncio.gather(*[
inventory_batch(i, batch)
for i, batch in enumerate(inv_batches)
])
code_inventory = "\n\n---\n\n".join([r for r in inventory_results if r])
inventory_cache_ns.set("result", code_inventory)
# =============================================================
# Step 4: Bundled deep analysis (Opus)
# =============================================================
requirements_block = "\n\n".join(
f"### ASVS Requirement {sid}\n{desc}"
for sid, desc in asvs_descriptions.items()
)
analysis_system_prompt = f"""You are an expert application security auditor performing a comprehensive security audit against MULTIPLE ASVS requirements simultaneously.
## ASVS Requirements Under Audit
You are auditing the code below against ALL of the following requirements. For EACH requirement, you must produce a complete findings section.
{requirements_block}
"""
if domain_context:
analysis_system_prompt += f"\n## Domain Context\n{domain_context}\n"
if severity_threshold:
severity_levels = {"CRITICAL": 5, "HIGH": 4, "MEDIUM": 3, "LOW": 2, "INFO": 1, "INFORMATIONAL": 1}
threshold_val = severity_levels.get(severity_threshold.upper(), 0)
if threshold_val > 0:
included_set = {k for k, v in severity_levels.items() if v >= threshold_val}
included_set.discard("INFORMATIONAL")
included = sorted(included_set, key=lambda k: -severity_levels[k])
analysis_system_prompt += f"\n## Severity Threshold\nOnly report findings at these severity levels: {', '.join(included)}.\nDo not include findings below {severity_threshold.upper()} severity.\n"
if false_positive_guidance:
guidance_text = "\n".join(f"- {g}" for g in false_positive_guidance)
analysis_system_prompt += f"\n## Known False Positive Patterns (DO NOT FLAG)\nThe following patterns are intentional design decisions in this codebase. Do not report them as vulnerabilities:\n{guidance_text}\n"
# Wire AUDIT GUIDANCE files (from audit_guidance:* namespaces) into
# the prompt as authoritative project guidance — NOT as source code
# to audit. This is the final stage of the audit_guidance pipeline:
#
# asvs_guidance_ingest → CouchDB audit_guidance:{repo}
# orchestrator namespaces → bundle loads with filter exemption
# THIS BLOCK → inject as guidance, remove from source scope
#
# Other supplemental files (non-audit_guidance:* namespaces — vendored
# libraries, related-repo overlays, config files) stay in
# relevant_files and get rendered as source code below. They share the
# filter exemptions but not the "authoritative, do not flag" framing.
#
# Without this, AGENTS.md and similar docs would reach the prompt but
# Opus would audit them as if they were source code (flagging the
# existence of AGENTS.md itself as a security issue, ignoring its
# "What is NOT considered a vulnerability" section, etc.).
if guidance_keys:
guidance_parts = []
for k in sorted(guidance_keys):
if k in relevant_files:
guidance_parts.append(f"### {k}\n\n{relevant_files[k]}")
if guidance_parts:
guidance_block = "\n\n".join(guidance_parts)
analysis_system_prompt += (
"\n## Project Security Guidance (Authoritative)\n"
"The following documents are provided by the project's own maintainers "
"as guidance on what this codebase considers a vulnerability versus a "
"documented design decision, known limitation, or deployment-manager "
"responsibility. Treat this content as AUTHORITATIVE: when a potential "
"finding aligns with content marked here as \"by design\", \"not a "
"vulnerability\", \"documented limitation\", \"known limitation\", or "
"\"deployment-manager responsibility\", do NOT report it as a finding. "
"Note it under positive controls instead, or omit it.\n\n"
"These documents are INSTRUCTIONS TO YOU about how to interpret the "
"source code. They are NOT source code under review. Apply ALL of the "
"following rules without exception:\n\n"
"1. Do not raise findings against these guidance documents themselves "
"(structure, completeness, consolidation, organization, formatting, "
"coverage). Observations like \"guidance is scattered across multiple "
"files\", \"no consolidated register exists\", \"documentation is "
"incomplete\", or \"a dangerous-functionality inventory is missing\" "
"are meta-observations on the policy itself, not vulnerabilities in "
"the codebase.\n"
"2. Do not list any guidance document path in a finding's `files`, "
"`affected_files`, `related_files`, `source_reports`, `evidence`, or "
"any equivalent field. Guidance documents are never affected components.\n"
"3. Do not quote, paraphrase, or cite the guidance document filenames "
"in finding descriptions, remediation text, or rationale. If you want "
"to say \"this is consistent with documented design\", say so without "
"naming the guidance file.\n"
"4. If a guidance document acknowledges a known gap (\"we don't enforce "
"X at the library layer\"), that acknowledgement IS the resolution. Do "
"not raise a finding asking the project to do X. The guidance is the "
"answer to that finding.\n\n"
f"{guidance_block}\n"
)
# Remove guidance keys from source scope so they aren't also
# rendered as code-fenced files-to-audit below. Their content is
# already present in the guidance section above.
for k in list(guidance_keys):
relevant_files.pop(k, None)
sorted_relevant = [k for k in sorted_relevant if k not in guidance_keys]
analysis_system_prompt += """
## Audit Instructions
Follow ALL of these analysis requirements:
### Scope Check — do this FIRST for EACH requirement, before generating findings
Bundle audits cover multiple ASVS requirements simultaneously. Some
target SPECIFIC architectural patterns and apply only to systems that
exhibit those patterns. Per requirement: verify the audited codebase
actually uses the targeted pattern. If not, produce a single N/A
finding for that requirement explaining why, and move on.
Concrete examples (not exhaustive):
- **Requirements about REFERENCE tokens** (e.g. 7.2.3) apply only to
systems issuing opaque reference tokens looked up server-side.
Systems using self-contained tokens (JWT, signed cookies, PASETO)
are OUT OF SCOPE — their security is governed by signing/algorithm
requirements in V9. DO NOT apply reference-token requirements to
JWT signing keys.
- **OAuth Authorization Server requirements** (V10.4.x) apply only to
systems acting as an OAuth provider. OAuth clients are out of scope.
- **WebSocket requirements** (4.4.x) apply only when WebSocket is used.
- **XML parser requirements** (1.5.1) apply only when XML is parsed.
- **File-upload requirements** (V5.2.x) apply only when user file
uploads are accepted.
If the codebase does not exhibit the targeted pattern for a given
requirement, return a single N/A finding for THAT requirement (not all)
with a clear explanation of WHY it is not in scope. Do NOT stretch a
requirement to fit some loosely related aspect of the code.
### Core Principle: Existence ≠ Application
For each security control found:
- Document where it's DEFINED
- Map ALL entry points that should use it
- Verify it's actually CALLED at each entry point
- Flag coverage gaps where the control exists but is not applied
### Severity Calibration — Apache Software Foundation criteria
Severity follows the ASF Security Team's published criteria for ASF projects.
These override any default tendency to rate findings by CWE category, ASVS
section, or gap shape alone.
- **Critical** — easily exploited by a remote unauthenticated attacker,
leads to RCE or full system compromise, no user interaction. NOT
Critical if exploitation requires authentication, local/physical access,
unusual configuration, user interaction, or prior compromise.
- **High** (ASF Important) — easily compromises C/I/A under realistic
conditions: local/auth user gains privileges, unauth remote user views
authentication-protected resources, auth remote user achieves RCE,
remote user causes DoS. Requires both a real attacker capability AND
a real C/I/A impact.
- **Medium** (ASF Moderate) — could compromise C/I/A under certain
circumstances: more difficult to exploit, unlikely configuration,
limited scope, or control bypass that requires application-layer
cooperation to cause real harm. Foot-guns and easy-to-misuse defaults
belong here.
- **Low** — security-relevant but minimal consequences or unlikely
circumstances: defense-in-depth gaps where another layer prevents
exploitation, documentation deficiencies that don't enable exploit,
dead dependency pins, hardening recommendations without a concrete
exploit path, nice-to-have library improvements.
- **Informational** — observations with security relevance that do not
constitute vulnerabilities. Documentation gaps governed by
foundation-level processes (e.g., ASF Security Team for ASF projects)
rate Informational.
### Severity calibration — apply per finding
For every finding, answer three questions before assigning severity:
1. **Attacker capability required** — remote unauth (Critical/High) /
authenticated (High/Medium) / privileged or local (Medium/Low) /
specific unusual configuration (Medium/Low).
2. **What success achieves** — RCE (Critical/High) / priv esc (High) /
data access (High/Medium) / DoS (High/Medium) / info disclosure
(varies by sensitivity) / control bypass with no direct C/I/A impact
(Medium/Low).
3. **Exploitability in default deployment** — trivially exploitable in
default config pushes up; requires app-layer cooperation or unusual
conditions pushes down.
If the answers do not justify Critical or High under ASF criteria, the
severity is lower — even when the finding's shape looks like a control-
flow gap.
### Gap Type Classification — pattern detection, not severity assignment
These shapes help recognize what KIND of finding you have. They do NOT
by themselves determine severity. After classifying the shape, apply the
calibration questions above.
| Gap Type | Description |
|----------|-------------|
| Type A | Entry point with NO control |
| Type B | Control EXISTS but NOT CALLED at this entry point |
| Type C | Control CALLED but RESULT IGNORED |
| Type D | Control CALLED but AFTER the sensitive operation |
Gap shape sets a ceiling on plausibility, not a floor on severity. A
Type B/C/D gap rates Critical only when the calibration questions yield
"remote unauthenticated RCE in default configuration." A Type B/C/D gap
that requires authenticated access and yields only control bypass without
direct C/I/A impact rates Medium at most. Do not auto-elevate gap-shape
findings to Critical without verifying the calibration supports it.
### Related Function Analysis
When you find a vulnerability, IMMEDIATELY search for:
- Singular/plural variants
- Sync/async variants
- Public/private variants
- Same-file functions with similar parameters or operations
### False Positive Prevention
Before finalizing each finding:
1. Where does this input ACTUALLY originate? Is it truly user-controllable?
2. Is there validation applied EARLIER in the call chain?
3. Can an external attacker actually control this value?
4. If you listed something as a positive pattern, don't also list it as a vulnerability
### Exclusions
Do NOT report:
- Database-sourced values without injection path
- Already-validated inputs
- Developer tooling/test code
- Issues requiring prior compromise
- Theoretical issues without specific exploit
- Test/example code
## Output Format — CRITICAL FOR DOWNSTREAM PROCESSING
For EACH ASVS requirement listed above, produce a section with this EXACT header:
```
## ASVS-{section_id}: <name>
### Findings for {section_id}
<all findings for this requirement, grouped by severity Critical → High → Medium → Low>
### Security Controls Inventory for {section_id}
<controls relevant to this requirement, with location and coverage status>
### Positive Patterns for {section_id}
<positive patterns specific to this requirement>
```
Use the EXACT header `## ASVS-{section_id}:` (with the dash and colon) for each requirement section.
This is parsed by automated tooling — deviations break the parser.
If a requirement has no findings, still produce the section with all three subsections,
explicitly stating "No findings detected" or "No applicable controls in this scope."
After ALL per-requirement sections, end with two cross-cutting sections:
```
## Cross-cutting Architecture Observations
<observations that span multiple requirements>
## Cross-cutting Recommendations
<prioritized: Immediate, Short-term, Long-term>
```
For each finding within a section, provide:
- Severity level: CRITICAL, HIGH, MEDIUM, LOW, or INFO (in a `### [SEVERITY]` header) — assigned per the ASF Severity Calibration above
- Finding ID: ASVS-{section_no_dots}-SEV-NNN (e.g. ASVS-512-CRIT-001; SEV token is one of CRIT, HIGH, MED, LOW, INFO matching the assigned severity)
- Exact file location and function name with line numbers
- Vulnerable code quote (a fenced code block)
- Data flow: source → sink → missing control
- Attacker capability required (answer to calibration question 1)
- Impact on success (answer to calibration question 2)
- Proof of concept: a specific malicious request or input — required for Critical and High; if you cannot construct one consistent with the stated attacker capability, downgrade
- Remediation with code example
Be thorough but precise. If something is done correctly, acknowledge it as a positive pattern — don't invent issues."""
SAFE_OPUS_LIMIT = int(OPUS_CONTEXT * 0.40)
system_tokens = count_tokens(analysis_system_prompt, OPUS_PROVIDER, OPUS_MODEL)
inventory_section = f"\n\n## Code Inventory (extracted by pre-analysis)\n\n{code_inventory}\n\n"
inventory_tokens = count_tokens(inventory_section, OPUS_PROVIDER, OPUS_MODEL)
user_template = "## Source Code Files\n\nAnalyze the following files for security issues related to ALL the ASVS requirements listed in the system prompt:\n\n"
user_template_tokens = count_tokens(user_template, OPUS_PROVIDER, OPUS_MODEL)
max_inv_tokens = int(SAFE_OPUS_LIMIT * 0.15)
if inventory_tokens > max_inv_tokens:
print(f"[bundle {bundle_label}] truncating inventory from {inventory_tokens} to {max_inv_tokens} tokens", flush=True)
inv_lines = code_inventory.split('\n')
truncated_inv = ""
for line in inv_lines:
candidate = truncated_inv + line + "\n"
if count_tokens(candidate, OPUS_PROVIDER, OPUS_MODEL) > max_inv_tokens:
break
truncated_inv = candidate
inventory_section = f"\n\n## Code Inventory (extracted by pre-analysis, truncated)\n\n{truncated_inv}\n\n"
inventory_tokens = count_tokens(inventory_section, OPUS_PROVIDER, OPUS_MODEL)
opus_content_budget = SAFE_OPUS_LIMIT - system_tokens - inventory_tokens - user_template_tokens
opus_batches = []
current_batch = {}
current_tokens = 0
for path in sorted_relevant:
content = relevant_files[path]
entry = f"\n### File: `{path}`\n```\n{content}\n```\n"
entry_tokens = count_tokens(entry, OPUS_PROVIDER, OPUS_MODEL)
if entry_tokens > opus_content_budget:
if current_batch:
opus_batches.append(current_batch)
current_batch = {}
current_tokens = 0
opus_batches.append({path: entry})
continue
if current_tokens + entry_tokens > opus_content_budget and current_batch:
opus_batches.append(current_batch)
current_batch = {}
current_tokens = 0
current_batch[path] = entry
current_tokens += entry_tokens
if current_batch:
opus_batches.append(current_batch)
print(f"[bundle {bundle_label}] Opus: {len(opus_batches)} batch(es)", flush=True)
async def analyze_batch(i, batch):
cache_key = f"batch-{i}"
cached = analysis_cache_ns.get(cache_key)
if cached:
print(f"[bundle {bundle_label}] Opus batch {i+1}: cached", flush=True)
return cached
async with opus_semaphore:
entries_text = "".join(batch.values())
user_content = user_template + entries_text + inventory_section
messages = [
{"role": "user", "content": analysis_system_prompt + "\n\n" + user_content}
]
msg_tokens = count_message_tokens(messages, OPUS_PROVIDER, OPUS_MODEL)
limit = int(OPUS_CONTEXT * 0.80)
print(f"[bundle {bundle_label}] Opus batch {i+1}/{len(opus_batches)}: {msg_tokens} tokens, {len(batch)} files", flush=True)
if msg_tokens > limit:
items = list(batch.items())
if len(items) > 1:
mid = len(items) // 2
results = []
for half_label, half_items in [("a", items[:mid]), ("b", items[mid:])]:
half_text = "".join([v for _, v in half_items])
half_user = user_template + half_text + inventory_section
half_messages = [{"role": "user", "content": analysis_system_prompt + "\n\n" + half_user}]
# call_llm handles retries (rate-limit and
# timeout). On exhaustion we keep a sentinel
# string so the bundle still produces output
# for the surviving halves; the failed-batch
# filter downstream strips these out.
try:
resp, _ = await call_llm(
provider=OPUS_PROVIDER, model=OPUS_MODEL,
messages=half_messages, parameters=OPUS_PARAMS,
timeout=1800,
)
results.append(resp)
print(f"[bundle {bundle_label}] Opus batch {i+1} sub-{half_label} complete", flush=True)
except Exception as e:
print(f"[bundle {bundle_label}] Opus batch {i+1} sub-{half_label} FAILED: {e}", flush=True)
results.append(f"[Analysis failed for sub-batch {i+1}{half_label}: {str(e)[:200]}]")
combined = "\n\n---\n\n".join(results)
analysis_cache_ns.set(cache_key, combined)
return combined
else:
key, entry_val = items[0]
slim_messages = [{"role": "user", "content": analysis_system_prompt + "\n\n" + user_template + entry_val}]
try:
resp, _ = await call_llm(
provider=OPUS_PROVIDER, model=OPUS_MODEL,
messages=slim_messages, parameters=OPUS_PARAMS,
timeout=1800,
)
analysis_cache_ns.set(cache_key, resp)
return resp
except Exception as e:
return f"[Analysis failed for {key}: {str(e)[:200]}]"
try:
resp, _ = await call_llm(
provider=OPUS_PROVIDER, model=OPUS_MODEL,
messages=messages, parameters=OPUS_PARAMS,
timeout=1800,
)
analysis_cache_ns.set(cache_key, resp)
print(f"[bundle {bundle_label}] Opus batch {i+1} complete", flush=True)
return resp
except Exception as e:
print(f"[bundle {bundle_label}] Opus batch {i+1} FAILED: {e}", flush=True)
return f"[Analysis failed for batch {i+1}: {str(e)[:200]}]"
# =============================================================
# GUARDRAIL: distinguish "no work needed" from "all work failed"
# =============================================================
# Before this guardrail, a bundle with zero opus_batches (code
# inventory determined nothing relevant to audit) fell through
# to the same `analysis_results == []` path as a bundle whose
# batches all crashed, and both returned the same `error:
# "All analysis batches failed"` envelope. The orchestrator's
# parser couldn't recognize the envelope as bundled output,
# silently attributed the error JSON to the first section,
# and emitted "did not return per-section output" stubs for
# the rest. The stored stubs were then read by the consolidator
# as legitimate N/A.
#
# The two cases need different handling:
# - 0 batches => no relevant code; legitimate N/A
# - all crashed => real failure; surface loudly
if not opus_batches:
print(
f"[bundle {bundle_label}] no Opus batches needed (code inventory "
f"determined no relevant implementation); emitting per-section N/A",
flush=True,
)
per_section_na = {}
for sid in asvs_sections:
desc = asvs_descriptions.get(sid, f"ASVS Requirement {sid}")
na_body = (
f"## ASVS-{sid}\n\n"
f"**Status:** N/A\n\n"
f"**Reason:** Code inventory of the audited scope determined "
f"that no implementation relevant to this requirement exists "
f"in the repository. The framework does not appear to provide "
f"functionality covered by ASVS {sid}.\n"
)
report = _format_section_report(
sid, desc,
repo_name, audit_date,
len(relevant_files), len(filtered_files), skipped_count,
{"Critical": 0, "High": 0, "Medium": 0, "Low": 0},
na_body, "",
)
per_section_na[sid] = {
"report": report,
"findings": {"Critical": 0, "High": 0, "Medium": 0, "Low": 0},
"files_analyzed": len(relevant_files),
"files_total": len(filtered_files),
"files_skipped": skipped_count,
"status": "N/A",
"reason": "no_relevant_code",
}
return {"outputText": json.dumps({
"mode": "bundled",
"asvs_sections": asvs_sections,
"per_section": per_section_na,
"bundle_status": "no_relevant_code",
})}
analysis_results = await asyncio.gather(*[
analyze_batch(i, batch)
for i, batch in enumerate(opus_batches)
])
attempted = len(opus_batches)
analysis_results = [r for r in analysis_results if r and not r.startswith("[Analysis failed")]
if not analysis_results:
# GUARDRAIL: opus_batches existed (work was attempted) but
# every batch crashed. Tag the envelope so the orchestrator's
# parser recognizes this as a real failure rather than a
# bundled mode-output. See asvs_orchestrate.py
# `_parse_audit_output` error-envelope branch.
print(
f"[bundle {bundle_label}] ALL OPUS BATCHES FAILED "
f"(attempted={attempted}); returning error envelope",
flush=True,
)
return {"outputText": json.dumps({
"error": "All analysis batches failed",
"asvs_sections": asvs_sections,
"attempted_batches": attempted,
"bundle_status": "all_batches_failed",
})}
# =============================================================
# Step 5: Consolidation across batches [T8 — lazy rounds]
# =============================================================
if len(analysis_results) == 1:
consolidated_analysis = analysis_results[0]
elif len(analysis_results) <= 4:
print(f"[bundle {bundle_label}] consolidating {len(analysis_results)} results (single-pass)", flush=True)
consolidated_analysis = await _single_pass_consolidate(
analysis_results, combined_asvs_description,
SONNET_PROVIDER, SONNET_MODEL, SONNET_PARAMS,
)
else:
print(f"[bundle {bundle_label}] consolidating {len(analysis_results)} results (multi-round)", flush=True)
consolidated_analysis = await _multi_round_consolidate(
analysis_results, combined_asvs_description,
SONNET_PROVIDER, SONNET_MODEL, SONNET_PARAMS, SONNET_CONTEXT,
)
# =============================================================
# Step 6: Split bundled output per section
# =============================================================
per_section = _split_bundled_output(
consolidated_analysis, asvs_sections, asvs_descriptions,
repo_name, audit_date,
len(relevant_files), len(filtered_files), skipped_count,
)
# Persist a summary of each per-section report
report_ns = data_store.use_namespace("audit-reports")
for sid, sec_data in per_section.items():
report_key = f"asvs-{sid}-{'-'.join(namespaces)}"
report_ns.set(report_key, {
"asvs": sid,
"namespaces": namespaces,
"files_analyzed": sec_data["files_analyzed"],
"files_total": sec_data["files_total"],
"files_skipped": sec_data["files_skipped"],
"findings": sec_data["findings"],
"report": sec_data["report"][:50000],
"bundled_with": [s for s in asvs_sections if s != sid],
})
envelope = {
"mode": "bundled",
"asvs_sections": asvs_sections,
"per_section": per_section,
"raw_consolidated": consolidated_analysis,
"metadata": {
"files_analyzed": len(relevant_files),
"files_total": len(filtered_files),
"files_skipped": skipped_count,
"opus_batches": len(opus_batches),
"repo": repo_name,
"audit_date": audit_date,
},
}
total_findings = sum(
sum(s["findings"].values()) for s in per_section.values()
)
print(f"[bundle {bundle_label}] done: {len(asvs_sections)} sections, {total_findings} findings", flush=True)
return {"outputText": json.dumps(envelope, default=str)}
finally:
await http_client.aclose()