blob: 9ec7d75e06f0959e584e5270d94bc229e25a7825 [file] [log] [blame]
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
import os
import sys
import click
import curses
# Import a widget internal for formatting time codes
from .widget import TimeCode
from .._scheduler import ElementJob
# Status()
#
# A widget for formatting overall status.
#
# Note that the render() and clear() methods in this class are
# simply noops in the case that the application is not connected
# to a terminal, or if the terminal does not support ANSI escape codes.
#
# Args:
# context (Context): The Context
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
# colors (bool): Whether to print the ANSI color codes in the output
#
class Status():
# Table of the terminal capabilities we require and use
_TERM_CAPABILITIES = {
'move_up': 'cuu1',
'move_x': 'hpa',
'clear_eol': 'el'
}
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
stream, colors=False):
self._context = context
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
self._stream = stream
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
self._spacing = 1
self._colors = colors
self._header = _StatusHeader(context,
content_profile, format_profile,
success_profile, error_profile,
stream)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
self._alloc_columns = None
self._line_length = 0
self._need_alloc = True
self._term_caps = self._init_terminal()
# add_job()
#
# Adds a job to track in the status area
#
# Args:
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def add_job(self, job):
elapsed = self._stream.elapsed_time
job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
# remove_job()
#
# Removes a job currently being tracked in the status area
#
# Args:
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def remove_job(self, job):
action_name = job.action_name
if not isinstance(job, ElementJob):
element = None
else:
element = job.element
self._jobs = [
job for job in self._jobs
if not (job.element is element and
job.action_name == action_name)
]
self._need_alloc = True
# clear()
#
# Clear the status area, it is necessary to call
# this before printing anything to the console if
# a status area is in use.
#
# To print some logging to the output and then restore
# the status, use the following:
#
# status.clear()
# ... print something to console ...
# status.render()
#
def clear(self):
if not self._term_caps:
return
for _ in range(self._last_lines):
self._move_up()
self._clear_line()
self._last_lines = 0
# render()
#
# Render the status area.
#
# If you are not printing a line in addition to rendering
# the status area, for instance in a timeout, then it is
# not necessary to call clear().
def render(self):
if not self._term_caps:
return
elapsed = self._stream.elapsed_time
self.clear()
self._check_term_width()
self._allocate()
# Nothing to render, early return
if self._alloc_lines == 0:
return
# Before rendering the actual lines, we need to add some line
# feeds for the amount of lines we intend to print first, and
# move cursor position back to the first line
for _ in range(self._alloc_lines + self._header.lines):
click.echo('', err=True)
for _ in range(self._alloc_lines + self._header.lines):
self._move_up()
# Render the one line header
text = self._header.render(self._term_width, elapsed)
click.echo(text, color=self._colors, err=True)
# Now we have the number of columns, and an allocation for
# alignment of each column
n_columns = len(self._alloc_columns)
for line in self._job_lines(n_columns):
text = ''
for job in line:
column = line.index(job)
text += job.render(self._alloc_columns[column] - job.size, elapsed)
# Add spacing between columns
if column < (n_columns - 1):
text += ' ' * self._spacing
# Print the line
click.echo(text, color=self._colors, err=True)
# Track what we printed last, for the next clear
self._last_lines = self._alloc_lines + self._header.lines
###################################################
# Private Methods #
###################################################
# _init_terminal()
#
# Initialize the terminal and return the resolved terminal
# capabilities dictionary.
#
# Returns:
# (dict|None): The resolved terminal capabilities dictionary,
# or None if the terminal does not support all
# of the required capabilities.
#
def _init_terminal(self):
# We need both output streams to be connected to a terminal
if not (sys.stdout.isatty() and sys.stderr.isatty()):
return None
# Initialized terminal, curses might decide it doesnt
# support this terminal
try:
curses.setupterm(os.environ.get('TERM', 'dumb'))
except curses.error:
return None
term_caps = {}
# Resolve the string capabilities we need for the capability
# names we need.
#
for capname, capval in self._TERM_CAPABILITIES.items():
code = curses.tigetstr(capval)
# If any of the required capabilities resolve empty strings or None,
# then we don't have the capabilities we need for a status bar on
# this terminal.
if not code:
return None
# Decode sequences as latin1, as they are always 8-bit bytes,
# so when b'\xff' is returned, this must be decoded to u'\xff'.
#
# This technique is employed by the python blessings library
# as well, and should provide better compatibility with most
# terminals.
#
term_caps[capname] = code.decode('latin1')
return term_caps
def _check_term_width(self):
term_width, _ = click.get_terminal_size()
if self._term_width != term_width:
self._term_width = term_width
self._need_alloc = True
def _move_up(self):
assert self._term_caps is not None
# Explicitly move to beginning of line, fixes things up
# when there was a ^C or ^Z printed to the terminal.
move_x = curses.tparm(self._term_caps['move_x'].encode('latin1'), 0)
move_x = move_x.decode('latin1')
move_up = curses.tparm(self._term_caps['move_up'].encode('latin1'))
move_up = move_up.decode('latin1')
click.echo(move_x + move_up, nl=False, err=True)
def _clear_line(self):
assert self._term_caps is not None
clear_eol = curses.tparm(self._term_caps['clear_eol'].encode('latin1'))
clear_eol = clear_eol.decode('latin1')
click.echo(clear_eol, nl=False, err=True)
def _allocate(self):
if not self._need_alloc:
return
# State when there is no jobs to display
alloc_lines = 0
alloc_columns = []
line_length = 0
# Test for the widest width which fits columnized jobs
for columns in reversed(range(len(self._jobs))):
alloc_lines, alloc_columns = self._allocate_columns(columns + 1)
# If the sum of column widths with spacing in between
# fits into the terminal width, this is a good allocation.
line_length = sum(alloc_columns) + (columns * self._spacing)
if line_length < self._term_width:
break
self._alloc_lines = alloc_lines
self._alloc_columns = alloc_columns
self._line_length = line_length
self._need_alloc = False
def _job_lines(self, columns):
for i in range(0, len(self._jobs), columns):
yield self._jobs[i:i + columns]
# Returns an array of integers representing the maximum
# length in characters for each column, given the current
# list of jobs to render.
#
def _allocate_columns(self, columns):
column_widths = [0 for _ in range(columns)]
lines = 0
for line in self._job_lines(columns):
line_len = len(line)
lines += 1
for col in range(columns):
if col < line_len:
job = line[col]
column_widths[col] = max(column_widths[col], job.size)
return lines, column_widths
# _StatusHeader()
#
# A delegate object for rendering the header part of the Status() widget
#
# Args:
# context (Context): The Context
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
#
class _StatusHeader():
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
stream):
#
# Public members
#
self.lines = 3
#
# Private members
#
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
self._stream = stream
self._time_code = TimeCode(context, content_profile, format_profile)
self._context = context
def render(self, line_length, elapsed):
project = self._context.get_toplevel_project()
line_length = max(line_length, 80)
#
# Line 1: Session time, project name, session / total elements
#
# ========= 00:00:00 project-name (143/387) =========
#
session = str(len(self._stream.session_elements))
total = str(len(self._stream.total_elements))
size = 0
text = ''
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
size += 8 # Size of time code
size += len(project.name) + 1
text += self._time_code.render_time(elapsed)
text += ' ' + self._content_profile.fmt(project.name)
text += ' ' + self._format_profile.fmt('(') + \
self._content_profile.fmt(session) + \
self._format_profile.fmt('/') + \
self._content_profile.fmt(total) + \
self._format_profile.fmt(')')
line1 = self._centered(text, size, line_length, '=')
#
# Line 2: Dynamic list of queue status reports
#
# (Fetched:0 117 0)→ (Built:4 0 0)
#
size = 0
text = ''
# Format and calculate size for each queue progress
for queue in self._stream.queues:
# Add spacing
if self._stream.queues.index(queue) > 0:
size += 2
text += self._format_profile.fmt('→ ')
queue_text, queue_size = self._render_queue(queue)
size += queue_size
text += queue_text
line2 = self._centered(text, size, line_length, ' ')
#
# Line 3: Cache usage percentage report
#
# ~~~~~~ cache: 69% ~~~~~~
#
usage = self._context.get_artifact_cache_usage()
usage_percent = '{}%'.format(usage.used_percent)
size = 21
size += len(usage_percent)
if usage.used_percent >= 95:
formatted_usage_percent = self._error_profile.fmt(usage_percent)
elif usage.used_percent >= 80:
formatted_usage_percent = self._content_profile.fmt(usage_percent)
else:
formatted_usage_percent = self._success_profile.fmt(usage_percent)
text = self._format_profile.fmt("~~~~~~ ") + \
self._content_profile.fmt('cache') + \
self._format_profile.fmt(': ') + \
formatted_usage_percent + \
self._format_profile.fmt(' ~~~~~~')
line3 = self._centered(text, size, line_length, ' ')
return line1 + '\n' + line2 + '\n' + line3
###################################################
# Private Methods #
###################################################
def _render_queue(self, queue):
processed = str(len(queue.processed_elements))
skipped = str(len(queue.skipped_elements))
failed = str(len(queue.failed_elements))
size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']'
size += len(queue.complete_name)
size += len(processed) + len(skipped) + len(failed)
text = self._format_profile.fmt("(") + \
self._content_profile.fmt(queue.complete_name) + \
self._format_profile.fmt(":") + \
self._success_profile.fmt(processed) + ' ' + \
self._content_profile.fmt(skipped) + ' ' + \
self._error_profile.fmt(failed) + \
self._format_profile.fmt(")")
return (text, size)
def _centered(self, text, size, line_length, fill):
remaining = line_length - size
remaining -= 2
final_text = self._format_profile.fmt(fill * (remaining // 2)) + ' '
final_text += text
final_text += ' ' + self._format_profile.fmt(fill * (remaining // 2))
return final_text
# _StatusJob()
#
# A delegate object for rendering a job in the status area
#
# Args:
# context (Context): The Context
# job (Job): The job being processed
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
def __init__(self, context, job, content_profile, format_profile, elapsed):
action_name = job.action_name
if not isinstance(job, ElementJob):
element = None
else:
element = job.element
#
# Public members
#
self.element = element # The Element
self.action_name = action_name # The action name
self.size = None # The number of characters required to render
self.full_name = element._get_full_name() if element else action_name
#
# Private members
#
self._offset = elapsed
self._content_profile = content_profile
self._format_profile = format_profile
self._time_code = TimeCode(context, content_profile, format_profile)
# Calculate the size needed to display
self.size = 10 # Size of time code with brackets
self.size += len(action_name)
self.size += len(self.full_name)
self.size += 3 # '[' + ':' + ']'
# render()
#
# Render the Job, return a rendered string
#
# Args:
# padding (int): Amount of padding to print in order to align with columns
# elapsed (datetime): The session elapsed time offset
#
def render(self, padding, elapsed):
text = self._format_profile.fmt('[') + \
self._time_code.render_time(elapsed - self._offset) + \
self._format_profile.fmt(']')
# Add padding after the display name, before terminating ']'
name = self.full_name + (' ' * padding)
text += self._format_profile.fmt('[') + \
self._content_profile.fmt(self.action_name) + \
self._format_profile.fmt(':') + \
self._content_profile.fmt(name) + \
self._format_profile.fmt(']')
return text