blob: 13a5d2aad8f087055a13faf908577477be6822e0 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' physicalplan.py '''
import sys
from typing import Optional
import requests
from tabulate import tabulate
from heron.common.src.python.utils.log import Log
from heron.tools.common.src.python.clients import tracker
def to_table(metrics):
""" normalize raw metrics API result to table """
all_queries = tracker.metric_queries()
m = tracker.queries_map()
header = ['container id'] + [m[k] for k in all_queries if k in metrics.keys()]
stats = []
if not metrics:
return stats, header
names = list(metrics.values())[0].keys()
for n in names:
info = [n]
for field in all_queries:
try:
info.append(str(metrics[field][n]))
except KeyError:
pass
stats.append(info)
return stats, header
def run_metrics(
cluster: str,
role: str,
environment: str,
topology: str,
component: Optional[str],
) -> None:
"""Render a table of metrics."""
try:
result = tracker.get_topology_info(cluster, environment, topology, role)
except requests.ConnectionError as e:
Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
all_components = sorted(result['physical_plan']['components'].keys())
if component:
if component not in all_components:
Log.error(f"Unknown component: {component!r}")
sys.exit(1)
components = [component]
else:
components = all_components
all_queries = tracker.metric_queries()
for i, comp in enumerate(components):
try:
result = tracker.get_comp_metrics(
cluster, environment, topology, comp, [], all_queries, [0, -1], role,
)
except requests.ConnectionError as e:
Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
stat, header = to_table(result["metrics"])
if i != 0:
print('')
print(f"{comp!r} metrics:")
print(tabulate(stat, headers=header))
def run_containers(
cluster: str,
role: str,
environment: str,
topology: str,
container_id: str,
) -> None:
"""Render a table of container information."""
try:
result = tracker.get_topology_info(cluster, environment, topology, role)
except requests.ConnectionError as e:
Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
containers = result['physical_plan']['stmgrs']
all_bolts, all_spouts = set(), set()
for bolts in result['physical_plan']['bolts'].values():
all_bolts |= set(bolts)
for spouts in result['physical_plan']['spouts'].values():
all_spouts |= set(spouts)
stmgrs = sorted(containers.keys())
if container_id is not None:
stmgrs = [stmgrs[container_id]]
table = []
for cid, name in enumerate(stmgrs, (container_id + 1 if container_id else 1)):
instances = containers[name]["instance_ids"]
table.append([
cid,
containers[name]["host"],
containers[name]["port"],
containers[name]["pid"],
len([1 for instance in instances if instance in all_bolts]),
len([1 for instance in instances if instance in all_spouts]),
len(instances),
])
headers = ["container", "host", "port", "pid", "#bolt", "#spout", "#instance"]
print(tabulate(table, headers=headers))