blob: 8e345bced3d817adab66eb403ebc95846986897c [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
heron-ui provides a web interface for exploring the state of a
cluster as reported by a tracker.
"""
import logging
import os.path
import sys
import time
from collections import Counter
from datetime import datetime
from typing import Callable, List, Optional
from heron.tools.common.src.python.utils import config
from heron.tools.common.src.python.clients import tracker
from heron.common.src.python.utils import log
import click
import pydantic
import requests
import uvicorn
from fastapi import APIRouter, FastAPI, Query, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse, Response
from starlette.exceptions import HTTPException as StarletteHTTPException
VERSION = config.get_version_number()
DEFAULT_ADDRESS = "0.0.0.0"
DEFAULT_PORT = 8889
DEFAULT_TRACKER_URL = "http://127.0.0.1:8888"
DEFAULT_BASE_URL = ""
base_url = DEFAULT_BASE_URL
tracker_url = DEFAULT_TRACKER_URL
app = FastAPI(title="Heron UI", version=VERSION)
templates = Jinja2Templates(
directory=os.path.join(sys.path[0], "heron/tools/ui/resources/templates")
)
topologies_router = APIRouter()
@app.get("/")
def home():
"""Redirect from root to topologies listing."""
return RedirectResponse(url=app.url_path_for("topologies_page"))
@topologies_router.get("")
def topologies_page(request: Request) -> Response:
"""Return a rendered list of topologies."""
return templates.TemplateResponse("topologies.html", {
"topologies": [],
"clusters": [str(cluster) for cluster in tracker.get_clusters()],
"active": "topologies",
"baseUrl": base_url,
"request": request,
})
@topologies_router.get("/{cluster}/{environment}/{topology}/config")
def config_page(
cluster: str,
environment: str,
topology: str,
request: Request,
) -> Response:
"""Render a HTML page of config for a topology."""
return templates.TemplateResponse(
"config.html",
{
"cluster": cluster,
"environ": environment,
"topology": topology,
"active": "topologies",
"baseUrl": base_url,
"request": request,
},
)
@topologies_router.get("/{cluster}/{environment}/{topology}/{instance}/{component}/exceptions")
def exceptions_page(
cluster: str, environment: str, topology: str, component: str, instance: str,
request: Request
) -> Response:
"""Render a HTML page of exceptions for a container."""
return templates.TemplateResponse(
"exception.html",
{
"cluster": cluster,
"environ": environment,
"topology": topology,
"comp_name": component,
"instance": instance,
"active": "topologies",
"baseUrl": base_url,
"request": request,
},
)
@topologies_router.get("/{cluster}/{environment}/{topology}")
def planner_page(
cluster: str,
environment: str,
topology: str,
request: Request,
) -> Response:
"""Render a HTML page to show information about a topology."""
execution_state = tracker.get_execution_state(cluster, environment, topology)
scheduler_location = tracker.get_scheduler_location(
cluster, environment, topology
)
# is the tracker really making links for the UI!?
job_page_link = scheduler_location["job_page_link"]
launched_at = datetime.utcfromtimestamp(execution_state["submission_time"])
launched_time = launched_at.isoformat(" ") + "Z"
return templates.TemplateResponse(
"topology.html",
{
"cluster": cluster,
"environ": environment,
"topology": topology,
"execution_state": execution_state,
"launched": launched_time,
"status": "unknown", # supposed to be "running" or "errors", but not implemented
"active": "topologies",
"job_page_link": job_page_link,
"baseUrl": base_url,
"request": request,
},
)
@topologies_router.get("/metrics")
def metrics(
cluster: str,
environ: str,
topology: str,
metric_names: List[str] = Query(None, alias="metricname"),
instances: List[str] = Query(None, alias="instance"),
component: Optional[str] = None,
interval: int = -1,
) -> dict:
"""Return metrics for a given time range."""
time_range = (0, interval)
component_names = (
[component]
if component else
tracker.get_comps(cluster, environ, topology)
)
# could make this async
result = {
# need to port over everything from access to tracker
c: tracker.get_comp_metrics(
cluster, environ, topology, c, instances, metric_names, time_range)
for c in component_names
}
# switching the payload shape is bad, so this should be factored out in the future
if component:
return result[component]
return result
# should factor out the tornado based access module
query_handler = tracker.HeronQueryHandler()
@topologies_router.get("/metrics/timeline")
def timeline(
cluster: str,
environ: str,
topology: str,
metric: str,
instance: str,
starttime: str,
endtime: str,
component: Optional[str] = None,
max: bool = False, # pylint: disable=redefined-builtin
) -> dict:
"""Return metrics for a given time range."""
timerange = (starttime, endtime)
component_names = (
[component]
if component else
tracker.get_comps(cluster, environ, topology)
)
if metric == "backpressure":
result = {
c: query_handler.fetch_backpressure(
cluster, metric, topology, c,
instance, timerange, max, environ,
)
for c in component_names
}
else:
fetch = query_handler.fetch_max if max else query_handler.fetch
result = {
c: fetch(cluster, metric, topology, c, instance, timerange, environ)
for c in component_names
}
# switching the payload shape is bad, so this should be factored out in the future
if component:
return result[component]
return result
@topologies_router.get("/filestats/{cluster}/{environment}/{topology}/{container}/file")
def file_stats_page(
cluster: str, environment: str, topology: str,
container: str, request: Request, path: str = ".",
) -> Response:
"""Render a HTML page for exploring a container's files."""
data = tracker.get_filestats(cluster, environment, topology, container, path)
return templates.TemplateResponse(
"browse.html",
{
"cluster": cluster,
"environ": environment,
"topology": topology,
"container": container,
"path": path,
"filestats": data,
"baseUrl": base_url,
"request": request,
},
)
@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/file")
def file_page(
cluster: str, environment: str, topology: str, container: str, path: str,
request: Request,
) -> Response:
"""Render a HTML page for retrieving a container's file."""
return templates.TemplateResponse(
"file.html",
{
"cluster": cluster,
"environ": environment,
"topology": topology,
"container": container,
"path": path,
"baseUrl": base_url,
"request": request,
},
)
@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/filedata")
def file_data(
cluster: str,
environment: str,
topology: str,
container: str,
offset: int,
length: int,
path: str,
) -> Response:
"""Return a byte-range of data from a container's file."""
# this should just use the byte-range header in a file download method
data = tracker.get_container_file_data(
cluster, environment, topology, container, path, offset, length
)
return data
@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/filedownload")
def file_download(
cluster: str, environment: str, topology: str, container: str, path: str
) -> Response:
"""Return a file from a container."""
filename = os.path.basename(path)
# make a streaming response and use a streaming download client
download_url = tracker.get_container_file_download_url(
cluster, environment, topology, container, path
)
data = requests.get(download_url)
return Response(
content=data.content,
media_type="application/binary",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
# topology list and plan handlers
class ApiEnvelope(pydantic.BaseModel):
"""Envelope for heron-ui JSON API."""
status: str
message: str
version: str = VERSION
executiontime: int
result: dict
def api_topology_json(method: Callable[[], dict]) -> ApiEnvelope:
"""Wrap the output of a method with a response envelope."""
started = time.time()
result = method()
return ApiEnvelope(
status="success",
message="",
executiontime=time.time() - started,
result=result,
)
@topologies_router.get("/list.json")
def topologies_json() -> dict:
"""Return the (mutated) list of topologies."""
topologies = tracker.get_topologies_states()
result = {}
for c, cluster_value in topologies.items():
result[c] = {}
for e, environment_value in cluster_value.items():
result[c][e] = {}
for t, topology_value in environment_value.items():
if topology_value.get("jobname") is None:
continue
# transforming payloads is usually an indicator of a bad shape
topology_value.setdefault("submission_time", "-")
result[c][e][t] = topology_value
return result
@topologies_router.get(
"/{cluster}/{environment}/{topology}/logicalplan.json", response_model=ApiEnvelope
)
def logical_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
"""Return the logical plan object for a topology."""
return api_topology_json(lambda: tracker.get_logical_plan(
cluster, environment, topology, None,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/packingplan.json", response_model=ApiEnvelope
)
def packing_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
"""Return the packing plan object for a topology."""
return api_topology_json(lambda: tracker.get_packing_plan(
cluster, environment, topology, None,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/physicalplan.json", response_model=ApiEnvelope
)
def physical_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
"""Return the physical plan object for a topology."""
return api_topology_json(lambda: tracker.get_physical_plan(
cluster, environment, topology, None,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/executionstate.json", response_model=ApiEnvelope
)
def execution_state_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
"""Return the execution state object for a topology."""
return api_topology_json(lambda: tracker.get_execution_state(
cluster, environment, topology,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/schedulerlocation.json",
response_model=ApiEnvelope,
)
def scheduler_location_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
"""Unimplemented method which is currently a duplicate of execution state."""
return api_topology_json(lambda: tracker.get_execution_state(
cluster, environment, topology,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{component}/exceptions.json",
response_model=ApiEnvelope,
)
def exceptions_json(cluster: str, environment: str, topology: str, component: str) -> ApiEnvelope:
"""Return a list of exceptions for a component."""
return api_topology_json(lambda: tracker.get_component_exceptions(
cluster, environment, topology, component,
))
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{component}/exceptionsummary.json",
response_model=ApiEnvelope,
)
def exception_summary_json(
cluster: str, environment: str, topology: str, component: str
) -> ApiEnvelope:
"""Return a table of exception classes to totals."""
started = time.time()
if component.lower() == "all":
logical_plan = tracker.get_logical_plan(cluster, environment, topology)
if not logical_plan or not {"bolts", "spouts"} <= logical_plan.keys():
return {}
# looks like topologies can have spouts but no bolts, so they're assumed to be empty - should
# the above key check be removed and replaced with bolts defaulting to an empty list?
component_names = [*logical_plan["spouts"], *logical_plan["bolts"]]
else:
component_names = [component]
exception_infos = {
c: tracker.get_component_exceptionsummary(cluster, environment, topology, c)
for c in component_names
}
class_counts = Counter()
for exception_logs in exception_infos.values():
for exception_log in exception_logs:
class_counts[exception_log["class_name"]] += int(exception_log["count"])
aggregate_exceptions_table = [
[class_name, str(count)]
for class_name, count in class_counts.items()
]
return ApiEnvelope(
status="success",
message="",
executiontime=time.time() - started,
result=aggregate_exceptions_table,
)
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{instance}/pid"
)
def pid_snippet(
request: Request,
cluster: str,
environment: str,
topology: str,
instance: str,
) -> Response:
"""Render a HTML snippet containing topology output of container."""
physical_plan = tracker.get_physical_plan(cluster, environment, topology)
host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
"host"
]
info = tracker.get_instance_pid(cluster, environment, topology, instance)
command = info["command"]
stdout = info["stdout"]
return templates.TemplateResponse(
"shell.snip.html",
{
"request": request,
"host": host,
"command": command,
"output": stdout,
},
)
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{instance}/jstack"
)
def jstack_snippet(
request: Request,
cluster: str,
environment: str,
topology: str,
instance: str,
) -> HTMLResponse:
"""Render a HTML snippet containing jstack output of container."""
physical_plan = tracker.get_physical_plan(cluster, environment, topology)
host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
"host"
]
info = tracker.get_instance_jstack(cluster, environment, topology, instance)
command = info["command"]
stdout = info["stdout"]
return templates.TemplateResponse(
"shell.snip.html",
{
"request": request,
"host": host,
"command": command,
"output": stdout,
},
)
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{instance}/jmap"
)
def jmap_snippet(
request: Request,
cluster: str,
environment: str,
topology: str,
instance: str,
) -> HTMLResponse:
"""Render a HTML snippet containing jmap output of container."""
physical_plan = tracker.get_physical_plan(cluster, environment, topology)
host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
"host"
]
info = tracker.run_instance_jmap(cluster, environment, topology, instance)
command = info["command"]
stdout = info["stdout"]
info = """
<ul>
<li>May take longer than usual (1-2 minutes) please be patient.</li>
<li>Use SCP to copy heap dump files from host. (SCP {host}:/tmp/heap.bin /tmp/)</li>
</ul>
"""
return templates.TemplateResponse(
"shell.snip.html",
{
"request": request,
"host": host,
"command": command,
"output": stdout,
"info": info,
},
)
@topologies_router.get(
"/{cluster}/{environment}/{topology}/{instance}/histo"
)
def histogram_snippet(
request: Request,
cluster: str,
environment: str,
topology: str,
instance: str,
) -> HTMLResponse:
"""Render a HTML snippet containing jmap histogram output of container."""
# use a function to DRY up these container API methods
physical_plan = tracker.get_physical_plan(cluster, environment, topology)
host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
"host"
]
info = tracker.get_instance_mem_histogram(
cluster, environment, topology, instance
)
command = info["command"]
stdout = info["stdout"]
return templates.TemplateResponse(
"shell.snip.html",
{
"request": request,
"host": host,
"command": command,
"output": stdout,
},
)
app.include_router(topologies_router, prefix="/topologies")
app.mount(
"/static",
StaticFiles(directory=os.path.join(sys.path[0], "heron/tools/ui/resources/static")),
name="static",
)
@app.exception_handler(StarletteHTTPException)
async def unicorn_exception_handler(request: Request, exc: StarletteHTTPException) -> Response:
if exc.status_code == 404:
message = "URL not found"
else:
message = str(exc)
return templates.TemplateResponse("error.html", {"errormessage": message, "request": request})
def show_version(_, __, value):
if value:
config.print_build_info()
sys.exit(0)
@click.command()
@click.option("--tracker-url", "tracker_url_option", default=DEFAULT_TRACKER_URL)
@click.option("--base-url", "base_url_option", default=DEFAULT_BASE_URL)
@click.option("--host", default=DEFAULT_ADDRESS)
@click.option("--port", type=int, default=DEFAULT_PORT)
@click.option("--verbose", is_flag=True)
@click.option(
"--version",
is_flag=True,
is_eager=True,
expose_value=False,
callback=show_version,
)
def cli(
host: str, port: int, base_url_option: str, tracker_url_option: str, verbose: bool
) -> None:
"""Start a web UI for heron which renders information from the tracker."""
global base_url, tracker_url
base_url = base_url_option
log.configure(level=logging.DEBUG if verbose else logging.INFO)
tracker.tracker_url = tracker_url_option
uvicorn.run(app, host=host, port=port, log_config=None)
if __name__ == "__main__":
cli() # pylint: disable=no-value-for-parameter