blob: 69ef973923970902b1957ffcdc74c8941d9d6b27 [file]
# asvs_load_data
#
# Pre-flight bootstrap: loads OWASP ASVS v5.x into the `asvs` data store
# namespace so the rest of the pipeline has the authoritative requirement
# catalogue to validate against. Run this once per ASVS version before the
# first audit; safe to re-run when OWASP publishes a new patch.
#
# Source data: pulls the canonical CSV from the OWASP/ASVS GitHub repo at
# 5.0/docs_en/OWASP_Application_Security_Verification_Standard_<ver>_en.csv,
# which gives chapter / section / requirement structure plus the L1/L2/L3
# level for each requirement. The CSV alone is enough for an audit run.
#
# Optional markdown enrichment: chapter "control objectives" and per-section
# descriptive blurbs aren't in the CSV — they live in 5.0/en/0x*-V*-*.md
# in the same repo. With `enrichMarkdown=true` (default) this agent fetches
# those chapter files in parallel and parses out the prose paragraphs that
# sit between the chapter heading and the first section, and between each
# section heading and its requirement table. Worth doing because audit
# prompts use these as context for Opus.
#
# Inputs (all optional, all read from named fields with inputText fallback):
# version ASVS tag like "v5.0.0" or "5.0.0" (default: v5.0.0)
# clear Drop existing asvs:* keys before write (default: false)
# Default false is safe — same-id writes overwrite in
# place. Use true if you're switching between ASVS
# versions and want to purge orphan IDs from the prior
# version.
# githubToken Optional PAT to raise GitHub's rate limit during the
# markdown enrichment pass (~16 file fetches). Without
# a token the unauthenticated 60 req/hr limit is usually
# enough but you'll see WARNING: ...rate-limited if not.
# enrichMarkdown Toggle the markdown fetch+parse phase (default: true).
# Set false for a faster reload when only requirement
# text matters and chapter/section context isn't needed.
#
# Output: writes three key families into the `asvs` namespace:
# asvs:chapters:<N> {chapter_id, chapter_name, control_objective}
# asvs:sections:<N.M> {section_id, section_name, chapter_id, description}
# asvs:requirements:<N.M.K> {req_id, req_description, level, section_id, chapter_id}
#
# These keys are what asvs_discover and asvs_audit/asvs_bundle read at
# Step 0 ("Loading ASVS requirement context"). If you see the audit agents
# logging "WARNING: No data found for asvs:requirements:X.Y.Z" — either
# this loader hasn't run, the version mismatches, or a downstream caller
# is asking for an ID that doesn't exist in the loaded version (see the
# discovery hallucination protection in asvs_discover and asvs_orchestrate).
#
# Restrictions: v5.x only. ASVS v4 has a different file layout and column
# schema and is rejected with an error rather than silently producing
# wrong data.
from agent_factory.remote_mcp_client import RemoteMCPClient
import httpx
async def run(input_dict, tools):
mcpc = { url : RemoteMCPClient(remote_url = url) for url in tools.keys() }
http_client = httpx.AsyncClient(timeout=60.0)
try:
import csv
import io
import re
import asyncio
# =============================================================
# Read inputs from their own fields. Anything in inputText is
# also accepted as a fallback for backwards compatibility.
# =============================================================
def _coerce_bool(v, default=False):
if isinstance(v, bool):
return v
if v is None:
return default
return str(v).strip().lower() in ("true", "1", "yes", "y", "on")
TAG_RE = re.compile(r"^v?\d+\.\d+\.\d+$")
def _normalize_tag(s):
if s is None:
return None
s = str(s).strip()
if not s or not TAG_RE.match(s):
return None
return s if s.startswith("v") else f"v{s}"
# Primary: discrete fields (which is how this agent's UI is wired).
version_raw = input_dict.get("version") or input_dict.get("tag") or ""
clear_raw = input_dict.get("clear")
if clear_raw is None:
clear_raw = input_dict.get("clearExisting")
token_raw = input_dict.get("githubToken") or input_dict.get("token") or ""
enrich_raw = input_dict.get("enrichMarkdown")
# Fallback: parse inputText for the same keys (one per line, k: v).
input_text = (input_dict.get("inputText") or "").strip()
if input_text:
for raw_line in input_text.split("\n"):
line = raw_line.strip()
if not line or ":" not in line:
continue
k, _, v = line.partition(":")
k = k.strip().lower()
v = v.strip()
if k in ("version", "tag") and not version_raw:
version_raw = v
elif k in ("clear", "clear_existing", "clearexisting") and clear_raw is None:
clear_raw = v
elif k in ("token", "github_token", "githubtoken") and not token_raw:
token_raw = v
elif k in ("enrich_markdown", "enrichmarkdown", "markdown") and enrich_raw is None:
enrich_raw = v
tag = _normalize_tag(version_raw) or "v5.0.0"
if version_raw and _normalize_tag(version_raw) is None:
return {"outputText": (
f"Error: invalid version '{version_raw}' (expected vN.N.N or N.N.N)"
)}
clear_existing = _coerce_bool(clear_raw, default=False)
github_token = (token_raw or "").strip()
enrich_markdown = _coerce_bool(enrich_raw, default=True)
ver = tag.lstrip("v")
if not ver.startswith("5."):
return {"outputText": (
f"Error: only ASVS v5.x is supported (got {tag}). "
f"v4 has a different file layout."
)}
csv_url = (
f"https://raw.githubusercontent.com/OWASP/ASVS/{tag}/5.0/docs_en/"
f"OWASP_Application_Security_Verification_Standard_{ver}_en.csv"
)
gh_headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
if github_token:
gh_headers["Authorization"] = f"Bearer {github_token}"
print(f"Loading ASVS {tag}", flush=True)
print(f" CSV: {csv_url}", flush=True)
print(f" clear_existing: {clear_existing}", flush=True)
print(f" enrich_markdown: {enrich_markdown}", flush=True)
# =============================================================
# Fetch + validate CSV
# =============================================================
csv_resp = await http_client.get(csv_url, follow_redirects=False)
if csv_resp.status_code != 200:
return {"outputText": (
f"Error fetching CSV ({csv_resp.status_code}): {csv_url}\n"
f"Tag '{tag}' may not exist, or the CSV may not be at the expected path "
f"in this version. Check https://github.com/OWASP/ASVS/tree/{tag}/5.0/docs_en\n"
f"Response body (first 500 chars): {csv_resp.text[:500]}"
)}
csv_text = csv_resp.content.decode("utf-8-sig", errors="replace")
EXPECTED_HEADER = "chapter_id,chapter_name,section_id,section_name,req_id,req_description,L"
first_line = csv_text.splitlines()[0] if csv_text else ""
if first_line.strip() != EXPECTED_HEADER:
return {"outputText": (
f"Error: response from {csv_url} is not the expected ASVS CSV.\n"
f"Expected header:\n {EXPECTED_HEADER}\n"
f"Got first line:\n {first_line[:200]}"
)}
# =============================================================
# Parse CSV
# =============================================================
chapters = {}
sections = {}
requirements = {}
reader = csv.DictReader(io.StringIO(csv_text))
rows_seen = 0
for row in reader:
rows_seen += 1
chapter_id = row["chapter_id"].lstrip("V").strip()
section_id = row["section_id"].lstrip("V").strip()
req_id = row["req_id"].lstrip("V").strip()
try:
level = int(row["L"])
except (ValueError, KeyError):
level = 1
chapters.setdefault(chapter_id, {
"chapter_id": chapter_id,
"chapter_name": row["chapter_name"].strip(),
"control_objective": "",
})
sections.setdefault(section_id, {
"section_id": section_id,
"section_name": row["section_name"].strip(),
"chapter_id": chapter_id,
"description": "",
})
requirements[req_id] = {
"req_id": req_id,
"req_description": row["req_description"].strip(),
"level": level,
"section_id": section_id,
"chapter_id": chapter_id,
}
print(f" Parsed: {rows_seen} rows -> {len(chapters)} chapters, "
f"{len(sections)} sections, {len(requirements)} requirements", flush=True)
if not requirements:
return {"outputText": f"Error: CSV parsed but produced 0 requirements. URL: {csv_url}"}
# =============================================================
# Markdown enrichment for control_objective and section descriptions.
# The CSV doesn't carry these and the JSON file only carries
# structural metadata; chapter intros and section blurbs live only
# in the per-chapter markdown files at 5.0/en/0x*-V*-*.md.
# =============================================================
ch_enriched = 0
sec_enriched = 0
md_skipped_reason = None
if enrich_markdown:
api_base = f"https://api.github.com/repos/OWASP/ASVS"
print(f" Listing chapter markdown files...", flush=True)
# Markdown lives in 5.0/en/ — the docs_en/ folder is for
# generated artifacts (CSV/JSON/PDF/DOCX) only.
list_resp = await http_client.get(
f"{api_base}/contents/5.0/en?ref={tag}",
headers=gh_headers,
)
if list_resp.status_code != 200:
md_skipped_reason = f"contents API returned {list_resp.status_code}"
if list_resp.status_code in (403, 429):
md_skipped_reason += " (rate-limited; pass githubToken to raise the limit)"
print(f" WARNING: {md_skipped_reason}; skipping markdown enrichment "
f"(tried 5.0/en/)", flush=True)
else:
items = list_resp.json()
# Match chapter files: 0x10-V1-Encoding.md, 0x11-V2-..., etc.
md_re = re.compile(r"^0x[0-9a-fA-F]+-V(\d+)[-.].*\.md$")
md_files = []
for it in items:
if it.get("type") != "file":
continue
name = it.get("name", "")
m = md_re.match(name)
if m and it.get("download_url"):
md_files.append((int(m.group(1)), name, it["download_url"]))
md_files.sort()
print(f" Found {len(md_files)} chapter markdown files", flush=True)
sem = asyncio.Semaphore(4)
async def _fetch_md(name, url):
async with sem:
try:
r = await http_client.get(url, headers=gh_headers, follow_redirects=True)
if r.status_code == 200:
return name, r.text
print(f" {name}: HTTP {r.status_code}", flush=True)
except Exception as e:
print(f" {name}: fetch failed ({type(e).__name__}: {e})", flush=True)
return name, None
fetched = await asyncio.gather(*[_fetch_md(n, u) for _, n, u in md_files])
# =====================================================
# Markdown layout (v5):
# # V<N> <Chapter Name>
# <control objective paragraph(s)>
# ## V<N>.<M> <Section Name>
# <section description paragraph(s)>
# | # | Description | L | <-- requirement table starts
# =====================================================
chapter_h_re = re.compile(r"^# V(\d+)\s+(.+?)\s*$", re.MULTILINE)
section_h_re = re.compile(r"^## V(\d+\.\d+)\s+(.+?)\s*$", re.MULTILINE)
table_start_re = re.compile(r"^\s*\|\s*#\s*\|", re.MULTILINE)
def _clean(text):
# Drop a "## Control Objective" sub-header if present.
text = re.sub(r"^##\s+Control Objective\s*$", "", text, flags=re.MULTILINE)
paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
return "\n\n".join(paragraphs).strip()
for name, md_text in fetched:
if not md_text:
continue
ch_m = chapter_h_re.search(md_text)
if not ch_m:
continue
chapter_id = ch_m.group(1)
sec_matches = list(section_h_re.finditer(md_text))
# Chapter control objective: between # V<N> and the first ## V<N>.<M>
co_start = ch_m.end()
co_end = sec_matches[0].start() if sec_matches else len(md_text)
co = _clean(md_text[co_start:co_end])
if co and chapter_id in chapters:
chapters[chapter_id]["control_objective"] = co
ch_enriched += 1
# Section descriptions
for i, sm in enumerate(sec_matches):
section_id = sm.group(1)
sd_start = sm.end()
sd_end = sec_matches[i + 1].start() if i + 1 < len(sec_matches) else len(md_text)
body = md_text[sd_start:sd_end]
tm = table_start_re.search(body)
if tm:
body = body[:tm.start()]
sd = _clean(body)
if sd and section_id in sections:
sections[section_id]["description"] = sd
sec_enriched += 1
print(f" Markdown enrichment: {ch_enriched}/{len(chapters)} chapters, "
f"{sec_enriched}/{len(sections)} sections", flush=True)
# =============================================================
# Write to data store
# =============================================================
asvs_ns = data_store.use_namespace("asvs")
cleared_count = 0
existing = asvs_ns.list_keys() or []
existing_asvs = [k for k in existing if k.startswith("asvs:")]
if clear_existing:
if existing_asvs:
print(f" Clearing {len(existing_asvs)} existing 'asvs:*' keys", flush=True)
for k in existing_asvs:
asvs_ns.delete(k)
cleared_count = len(existing_asvs)
else:
if existing_asvs:
print(f" Skipping clear; {len(existing_asvs)} existing 'asvs:*' keys "
f"will be overwritten in place where IDs match", flush=True)
print(f" Writing {len(chapters)} chapters...", flush=True)
for chapter_id, ch in chapters.items():
asvs_ns.set(f"asvs:chapters:{chapter_id}", ch)
print(f" Writing {len(sections)} sections...", flush=True)
for section_id, sec in sections.items():
asvs_ns.set(f"asvs:sections:{section_id}", sec)
print(f" Writing {len(requirements)} requirements...", flush=True)
for req_id, req in requirements.items():
asvs_ns.set(f"asvs:requirements:{req_id}", req)
# =============================================================
# Sanity check
# =============================================================
sample_req_id = sorted(
requirements.keys(), key=lambda s: [int(p) for p in s.split(".")]
)[0]
sample_req = asvs_ns.get(f"asvs:requirements:{sample_req_id}")
sample_sec = asvs_ns.get(f"asvs:sections:{sample_req.get('section_id','')}") if sample_req else None
sample_ch = asvs_ns.get(f"asvs:chapters:{sample_req.get('chapter_id','')}") if sample_req else None
sanity_ok = bool(
sample_req and sample_req.get("req_description") and sample_req.get("level")
and sample_sec and sample_sec.get("section_name")
and sample_ch and sample_ch.get("chapter_name")
)
levels = {1: 0, 2: 0, 3: 0}
for r in requirements.values():
lv = r.get("level")
if lv in levels:
levels[lv] += 1
summary_lines = [
f"ASVS {tag} loaded into data_store namespace 'asvs'",
f" Source URL: {csv_url}",
f" Chapters: {len(chapters)}",
f" Sections: {len(sections)}",
f" Requirements: {len(requirements)}",
f" L1: {levels[1]}",
f" L2: {levels[2]}",
f" L3: {levels[3]}",
f" Cleared first: {cleared_count} existing keys" if clear_existing else " Cleared first: no (clear=false)",
f" Chapter objectives populated: {ch_enriched}/{len(chapters)}",
f" Section descriptions populated: {sec_enriched}/{len(sections)}",
f" Sanity check (read-back of {sample_req_id}): {'OK' if sanity_ok else 'FAILED'}",
]
if md_skipped_reason:
summary_lines.append(f" Markdown enrichment skipped: {md_skipped_reason}")
if sample_req:
summary_lines.append(
f" Sample req {sample_req_id}: L{sample_req.get('level','?')} — "
f"{sample_req.get('req_description','')[:120]}"
)
result = "\n".join(summary_lines)
print(result, flush=True)
return {"outputText": result}
finally:
await http_client.aclose()