blob: f2609b38c33f7cc81909b7e80202eb4b0095bb79 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Fix missing schema references in the OpenAPI spec.
This script patches the openapi.json file to add any missing schemas
that are referenced but not defined.
"""
import json # noqa: TID251 - standalone docs script
import sys
from pathlib import Path
from typing import Any
def add_missing_schemas(spec: dict[str, Any]) -> tuple[dict[str, Any], list[str]]:
"""Add missing schema definitions to the OpenAPI spec."""
schemas = spec.get("components", {}).get("schemas", {})
fixed = []
# DashboardScreenshotPostSchema - based on superset/dashboards/schemas.py
if "DashboardScreenshotPostSchema" not in schemas:
schemas["DashboardScreenshotPostSchema"] = {
"type": "object",
"properties": {
"dataMask": {
"type": "object",
"description": "An object representing the data mask.",
"additionalProperties": True,
},
"activeTabs": {
"type": "array",
"items": {"type": "string"},
"description": "A list representing active tabs.",
},
"anchor": {
"type": "string",
"description": "A string representing the anchor.",
},
"urlParams": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 2,
},
"description": "A list of tuples, each containing two strings.",
},
},
}
fixed.append("DashboardScreenshotPostSchema")
# DashboardNativeFiltersConfigUpdateSchema - based on superset/dashboards/schemas.py
if "DashboardNativeFiltersConfigUpdateSchema" not in schemas:
schemas["DashboardNativeFiltersConfigUpdateSchema"] = {
"type": "object",
"properties": {
"deleted": {
"type": "array",
"items": {"type": "string"},
"description": "List of deleted filter IDs.",
},
"modified": {
"type": "array",
"items": {"type": "object"},
"description": "List of modified filter configurations.",
},
"reordered": {
"type": "array",
"items": {"type": "string"},
"description": "List of filter IDs in new order.",
},
},
}
fixed.append("DashboardNativeFiltersConfigUpdateSchema")
# DashboardColorsConfigUpdateSchema - based on superset/dashboards/schemas.py
if "DashboardColorsConfigUpdateSchema" not in schemas:
schemas["DashboardColorsConfigUpdateSchema"] = {
"type": "object",
"properties": {
"color_namespace": {
"type": "string",
"nullable": True,
"description": "The color namespace.",
},
"color_scheme": {
"type": "string",
"nullable": True,
"description": "The color scheme name.",
},
"map_label_colors": {
"type": "object",
"additionalProperties": {"type": "string"},
"description": "Mapping of labels to colors.",
},
"shared_label_colors": {
"type": "object",
"additionalProperties": {"type": "string"},
"description": "Shared label colors across charts.",
},
"label_colors": {
"type": "object",
"additionalProperties": {"type": "string"},
"description": "Label to color mapping.",
},
"color_scheme_domain": {
"type": "array",
"items": {"type": "string"},
"description": "Color scheme domain values.",
},
},
}
fixed.append("DashboardColorsConfigUpdateSchema")
# FormatQueryPayloadSchema - based on superset/sqllab/schemas.py
if "FormatQueryPayloadSchema" not in schemas:
schemas["FormatQueryPayloadSchema"] = {
"type": "object",
"required": ["sql"],
"properties": {
"sql": {
"type": "string",
"description": "The SQL query to format.",
},
"engine": {
"type": "string",
"nullable": True,
"description": "The database engine.",
},
"database_id": {
"type": "integer",
"nullable": True,
"description": "The database id.",
},
"template_params": {
"type": "string",
"nullable": True,
"description": "The SQL query template params as JSON string.",
},
},
}
fixed.append("FormatQueryPayloadSchema")
# get_slack_channels_schema - based on superset/reports/schemas.py
if "get_slack_channels_schema" not in schemas:
schemas["get_slack_channels_schema"] = {
"type": "object",
"properties": {
"search_string": {
"type": "string",
"description": "String to search for in channel names.",
},
"types": {
"type": "array",
"items": {
"type": "string",
"enum": ["public_channel", "private_channel"],
},
"description": "Types of channels to search.",
},
"exact_match": {
"type": "boolean",
"description": "Whether to match channel names exactly.",
},
},
}
fixed.append("get_slack_channels_schema")
if "components" not in spec:
spec["components"] = {}
spec["components"]["schemas"] = schemas
return spec, fixed
def path_to_operation_id(path: str, method: str) -> str:
"""Convert a path and method to an operationId."""
# Remove /api/v1/ prefix
clean_path = path.replace("/api/v1/", "").strip("/")
# Replace path parameters
clean_path = clean_path.replace("{", "by_").replace("}", "")
# Create operation name
method_prefix = {
"get": "get",
"post": "create",
"put": "update",
"delete": "delete",
"patch": "patch",
}.get(method.lower(), method.lower())
return f"{method_prefix}_{clean_path}".replace("/", "_").replace("-", "_")
def path_to_summary(path: str, method: str) -> str:
"""Generate a human-readable summary from path and method."""
# Remove /api/v1/ prefix
clean_path = path.replace("/api/v1/", "").strip("/")
# Handle path parameters
parts = []
for part in clean_path.split("/"):
if part.startswith("{") and part.endswith("}"):
param = part[1:-1]
parts.append(f"by {param}")
else:
parts.append(part.replace("_", " ").replace("-", " "))
resource = " ".join(parts)
method_verb = {
"get": "Get",
"post": "Create",
"put": "Update",
"delete": "Delete",
"patch": "Update",
}.get(method.lower(), method.capitalize())
return f"{method_verb} {resource}"
def add_missing_operation_ids(spec: dict[str, Any]) -> int:
"""Add operationId and summary to operations that are missing them."""
fixed_count = 0
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
summary = details.get("summary")
operation_id = details.get("operationId")
if not summary and not operation_id:
details["operationId"] = path_to_operation_id(path, method)
details["summary"] = path_to_summary(path, method)
fixed_count += 1
return fixed_count
TAG_DESCRIPTIONS = {
"Advanced Data Type": "Advanced data type operations and conversions.",
"Annotation Layers": "Manage annotation layers and annotations for charts.",
"AsyncEventsRestApi": "Real-time event streaming via Server-Sent Events (SSE).",
"Available Domains": "Get available domains for the Superset instance.",
"CSS Templates": "Manage CSS templates for custom dashboard styling.",
"CacheRestApi": "Cache management and invalidation operations.",
"Charts": "Create, read, update, and delete charts (slices).",
"Current User": "Get information about the authenticated user.",
"Dashboard Filter State": "Manage temporary filter state for dashboards.",
"Dashboard Permanent Link": "Permanent links to dashboard states.",
"Dashboards": "Create, read, update, and delete dashboards.",
"Database": "Manage database connections and metadata.",
"Datasets": "Manage datasets (tables) used for building charts.",
"Datasources": "Query datasource metadata and column values.",
"Embedded Dashboard": "Configure embedded dashboard settings.",
"Explore": "Chart exploration and data querying endpoints.",
"Explore Form Data": "Manage temporary form data for chart exploration.",
"Explore Permanent Link": "Permanent links to chart explore states.",
"Import/export": "Import and export Superset assets.",
"LogRestApi": "Access audit logs and activity history.",
"Menu": "Get the Superset menu structure.",
"OpenApi": "Access the OpenAPI specification.",
"Queries": "View and manage SQL Lab query history.",
"Report Schedules": "Configure scheduled reports and alerts.",
"Row Level Security": "Manage row-level security rules for data access.",
"SQL Lab": "Execute SQL queries and manage SQL Lab sessions.",
"SQL Lab Permanent Link": "Permanent links to SQL Lab states.",
"Security": "Authentication and token management.",
"Security Permissions": "View available permissions.",
"Security Permissions on Resources (View Menus)": "Permission-resource mappings.",
"Security Resources (View Menus)": "Manage security resources (view menus).",
"Security Roles": "Manage security roles and their permissions.",
"Security Users": "Manage user accounts.",
"Tags": "Organize assets with tags.",
"User": "User profile and preferences.",
}
def generate_code_sample(
method: str, path: str, has_body: bool = False
) -> list[dict[str, str]]:
"""Generate code samples for an endpoint in multiple languages."""
# Clean up path for display
example_path = path.replace("{pk}", "1").replace("{id_or_slug}", "1")
samples = []
# cURL sample
curl_cmd = f'curl -X {method.upper()} "http://localhost:8088{example_path}"'
curl_cmd += ' \\\n -H "Authorization: Bearer $ACCESS_TOKEN"'
if has_body:
curl_cmd += ' \\\n -H "Content-Type: application/json"'
curl_cmd += ' \\\n -d \'{"key": "value"}\''
samples.append(
{
"lang": "cURL",
"label": "cURL",
"source": curl_cmd,
}
)
# Python sample
if method.lower() == "get":
python_code = f"""import requests
response = requests.get(
"http://localhost:8088{example_path}",
headers={{"Authorization": "Bearer " + access_token}}
)
print(response.json())"""
elif method.lower() == "post":
python_code = f"""import requests
response = requests.post(
"http://localhost:8088{example_path}",
headers={{"Authorization": "Bearer " + access_token}},
json={{"key": "value"}}
)
print(response.json())"""
elif method.lower() == "put":
python_code = f"""import requests
response = requests.put(
"http://localhost:8088{example_path}",
headers={{"Authorization": "Bearer " + access_token}},
json={{"key": "value"}}
)
print(response.json())"""
elif method.lower() == "delete":
python_code = f"""import requests
response = requests.delete(
"http://localhost:8088{example_path}",
headers={{"Authorization": "Bearer " + access_token}}
)
print(response.status_code)"""
else:
python_code = f"""import requests
response = requests.{method.lower()}(
"http://localhost:8088{example_path}",
headers={{"Authorization": "Bearer " + access_token}}
)
print(response.json())"""
samples.append(
{
"lang": "Python",
"label": "Python",
"source": python_code,
}
)
# JavaScript sample
if method.lower() == "get":
js_code = f"""const response = await fetch(
"http://localhost:8088{example_path}",
{{
headers: {{
"Authorization": `Bearer ${{accessToken}}`
}}
}}
);
const data = await response.json();
console.log(data);"""
elif method.lower() in ["post", "put", "patch"]:
js_code = f"""const response = await fetch(
"http://localhost:8088{example_path}",
{{
method: "{method.upper()}",
headers: {{
"Authorization": `Bearer ${{accessToken}}`,
"Content-Type": "application/json"
}},
body: JSON.stringify({{ key: "value" }})
}}
);
const data = await response.json();
console.log(data);"""
else:
js_code = f"""const response = await fetch(
"http://localhost:8088{example_path}",
{{
method: "{method.upper()}",
headers: {{
"Authorization": `Bearer ${{accessToken}}`
}}
}}
);
console.log(response.status);"""
samples.append(
{
"lang": "JavaScript",
"label": "JavaScript",
"source": js_code,
}
)
return samples
def add_code_samples(spec: dict[str, Any]) -> int:
"""Add code samples to all endpoints."""
count = 0
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
# Skip if already has code samples
if "x-codeSamples" in details:
continue
# Check if endpoint has a request body
has_body = "requestBody" in details
details["x-codeSamples"] = generate_code_sample(method, path, has_body)
count += 1
return count
def configure_servers(spec: dict[str, Any]) -> bool:
"""Configure server URLs with variables for flexible API testing."""
new_servers = [
{
"url": "http://localhost:8088",
"description": "Local development server",
},
{
"url": "{protocol}://{host}:{port}",
"description": "Custom server",
"variables": {
"protocol": {
"default": "http",
"enum": ["http", "https"],
"description": "HTTP protocol",
},
"host": {
"default": "localhost",
"description": "Server hostname or IP",
},
"port": {
"default": "8088",
"description": "Server port",
},
},
},
]
# Check if already configured
existing = spec.get("servers", [])
if len(existing) >= 2 and any("variables" in s for s in existing):
return False
spec["servers"] = new_servers
return True
def add_tag_definitions(spec: dict[str, Any]) -> int:
"""Add tag definitions with descriptions to the OpenAPI spec."""
# Collect all unique tags used in operations
used_tags: set[str] = set()
for _path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
tags = details.get("tags", [])
used_tags.update(tags)
# Create tag definitions
tag_definitions = []
for tag in sorted(used_tags):
tag_def = {"name": tag}
if tag in TAG_DESCRIPTIONS:
tag_def["description"] = TAG_DESCRIPTIONS[tag]
else:
# Generate a generic description
tag_def["description"] = f"Endpoints related to {tag}."
tag_definitions.append(tag_def)
# Only update if we have new tags
existing_tags = {t.get("name") for t in spec.get("tags", [])}
new_tags = [t for t in tag_definitions if t["name"] not in existing_tags]
if new_tags or not spec.get("tags"):
spec["tags"] = tag_definitions
return len(tag_definitions)
return 0
def generate_example_from_schema( # noqa: C901
schema: dict[str, Any],
spec: dict[str, Any],
depth: int = 0,
max_depth: int = 5,
) -> dict[str, Any] | list[Any] | str | int | float | bool | None:
"""Generate an example value from an OpenAPI schema definition."""
if depth > max_depth:
return None
# Handle $ref
if "$ref" in schema:
ref_path = schema["$ref"]
if ref_path.startswith("#/components/schemas/"):
schema_name = ref_path.split("/")[-1]
ref_schema = (
spec.get("components", {}).get("schemas", {}).get(schema_name, {})
)
return generate_example_from_schema(ref_schema, spec, depth + 1, max_depth)
return None
# If schema already has an example, use it
if "example" in schema:
return schema["example"]
schema_type = schema.get("type", "object")
if schema_type == "object":
properties = schema.get("properties", {})
if not properties:
# Check for additionalProperties
if schema.get("additionalProperties"):
return {"key": "value"}
return {}
result = {}
for prop_name, prop_schema in properties.items():
# Limit object depth and skip large nested objects
if depth < max_depth:
example_val = generate_example_from_schema(
prop_schema, spec, depth + 1, max_depth
)
if example_val is not None:
result[prop_name] = example_val
return result
elif schema_type == "array":
items_schema = schema.get("items", {})
if items_schema:
item_example = generate_example_from_schema(
items_schema, spec, depth + 1, max_depth
)
if item_example is not None:
return [item_example]
return []
elif schema_type == "string":
# Check for enum
if "enum" in schema:
return schema["enum"][0]
# Check for format
fmt = schema.get("format", "")
if fmt == "date-time":
return "2024-01-15T10:30:00Z"
elif fmt == "date":
return "2024-01-15"
elif fmt == "email":
return "user@example.com"
elif fmt == "uri" or fmt == "url":
return "https://example.com"
elif fmt == "uuid":
return "550e8400-e29b-41d4-a716-446655440000"
# Use description hints or prop name
return "string"
elif schema_type == "integer":
if "minimum" in schema:
return schema["minimum"]
return 1
elif schema_type == "number":
if "minimum" in schema:
return schema["minimum"]
return 1.0
elif schema_type == "boolean":
return True
elif schema_type == "null":
return None
# Handle oneOf, anyOf
if "oneOf" in schema and schema["oneOf"]:
return generate_example_from_schema(
schema["oneOf"][0], spec, depth + 1, max_depth
)
if "anyOf" in schema and schema["anyOf"]:
return generate_example_from_schema(
schema["anyOf"][0], spec, depth + 1, max_depth
)
return None
def add_response_examples(spec: dict[str, Any]) -> int: # noqa: C901
"""Add example values to API responses for better documentation."""
count = 0
# First, add examples to standard error responses in components
standard_errors = {
"400": {"message": "Bad request: Invalid parameters provided"},
"401": {"message": "Unauthorized: Authentication required"},
"403": {
"message": "Forbidden: You don't have permission to access this resource"
},
"404": {"message": "Not found: The requested resource does not exist"},
"422": {"message": "Unprocessable entity: Validation error"},
"500": {"message": "Internal server error: An unexpected error occurred"},
}
responses = spec.get("components", {}).get("responses", {})
for code, example_value in standard_errors.items():
if code in responses:
response = responses[code]
content = response.get("content", {}).get("application/json", {})
if content and "example" not in content:
content["example"] = example_value
count += 1
# Now add examples to inline response schemas in operations
for _path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
responses_dict = details.get("responses", {})
for _status_code, response in responses_dict.items():
# Skip $ref responses (already handled above)
if "$ref" in response:
continue
content = response.get("content", {}).get("application/json", {})
if not content:
continue
# Skip if already has an example
if "example" in content:
continue
schema = content.get("schema", {})
if schema:
example = generate_example_from_schema(
schema, spec, depth=0, max_depth=3
)
if example is not None and example != {}:
content["example"] = example
count += 1
return count
def add_request_body_examples(spec: dict[str, Any]) -> int:
"""Add example values to API request bodies for better documentation."""
count = 0
for _path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["post", "put", "patch"]:
continue
if not isinstance(details, dict):
continue
request_body = details.get("requestBody", {})
if not request_body or "$ref" in request_body:
continue
content = request_body.get("content", {}).get("application/json", {})
if not content:
continue
# Skip if already has an example
if "example" in content:
continue
schema = content.get("schema", {})
if schema:
example = generate_example_from_schema(
schema, spec, depth=0, max_depth=4
)
if example is not None and example != {}:
content["example"] = example
count += 1
return count
def make_summaries_unique(spec: dict[str, Any]) -> int: # noqa: C901
"""Make duplicate summaries unique by adding context from the path."""
summary_info: dict[str, list[tuple[str, str]]] = {}
fixed_count = 0
# First pass: collect all summaries and their paths (regardless of method)
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
summary = details.get("summary")
if summary:
if summary not in summary_info:
summary_info[summary] = []
summary_info[summary].append((path, method))
# Second pass: make duplicate summaries unique
for path, methods in spec.get("paths", {}).items():
for method, details in methods.items():
if method not in ["get", "post", "put", "delete", "patch"]:
continue
if not isinstance(details, dict):
continue
summary = details.get("summary")
if summary and len(summary_info.get(summary, [])) > 1:
# Create a unique suffix from the full path
# e.g., /api/v1/chart/{pk}/cache_screenshot/ -> "chart-cache-screenshot"
clean_path = path.replace("/api/v1/", "").strip("/")
# Remove parameter placeholders and convert to slug
clean_path = clean_path.replace("{", "").replace("}", "")
path_slug = clean_path.replace("/", "-").replace("_", "-")
# Check if this suffix is already in the summary
if path_slug not in summary.lower():
new_summary = f"{summary} ({path_slug})"
details["summary"] = new_summary
fixed_count += 1
return fixed_count
def main() -> None: # noqa: C901
"""Main function to fix the OpenAPI spec."""
script_dir = Path(__file__).parent
spec_path = script_dir.parent / "static" / "resources" / "openapi.json"
if not spec_path.exists():
print(f"Error: OpenAPI spec not found at {spec_path}", file=sys.stderr)
sys.exit(1)
print(f"Reading OpenAPI spec from {spec_path}")
with open(spec_path, encoding="utf-8") as f:
spec = json.load(f)
spec, fixed_schemas = add_missing_schemas(spec)
fixed_ops = add_missing_operation_ids(spec)
fixed_tags = add_tag_definitions(spec)
fixed_servers = configure_servers(spec)
changes_made = False
if fixed_servers:
print("Configured server URLs with variables for flexible API testing")
changes_made = True
if fixed_samples := add_code_samples(spec):
print(f"Added code samples to {fixed_samples} endpoints")
changes_made = True
if fixed_examples := add_response_examples(spec):
print(f"Added example JSON responses to {fixed_examples} response schemas")
changes_made = True
if fixed_request_examples := add_request_body_examples(spec):
print(f"Added example JSON to {fixed_request_examples} request bodies")
changes_made = True
if fixed_schemas:
print(f"Added missing schemas: {', '.join(fixed_schemas)}")
changes_made = True
if fixed_ops:
print(f"Added operationId/summary to {fixed_ops} operations")
changes_made = True
if fixed_tags:
print(f"Added {fixed_tags} tag definitions with descriptions")
changes_made = True
if fixed_summaries := make_summaries_unique(spec):
print(f"Made {fixed_summaries} duplicate summaries unique")
changes_made = True
if changes_made:
with open(spec_path, "w", encoding="utf-8") as f:
json.dump(spec, f, indent=2)
f.write("\n") # Ensure trailing newline for pre-commit
print(f"Updated {spec_path}")
else:
print("No fixes needed")
if __name__ == "__main__":
main()