| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # Authors: |
| # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> |
| import os |
| import sys |
| import curses |
| from collections import OrderedDict |
| import shutil |
| import click |
| |
| # Import a widget internal for formatting time codes |
| from .widget import TimeCode |
| |
| |
| # Status() |
| # |
| # A widget for formatting overall status. |
| # |
| # Note that the render() and clear() methods in this class are |
| # simply noops in the case that the application is not connected |
| # to a terminal, or if the terminal does not support ANSI escape codes. |
| # |
| # Args: |
| # context (Context): The Context |
| # state (State): The state data from the Core |
| # content_profile (Profile): Formatting profile for content text |
| # format_profile (Profile): Formatting profile for formatting text |
| # success_profile (Profile): Formatting profile for success text |
| # error_profile (Profile): Formatting profile for error text |
| # stream (Stream): The Stream |
| # |
| class Status: |
| |
| # Table of the terminal capabilities we require and use |
| _TERM_CAPABILITIES = {"move_up": "cuu1", "move_x": "hpa", "clear_eol": "el"} |
| |
| def __init__(self, context, state, content_profile, format_profile, success_profile, error_profile, stream): |
| |
| self._context = context |
| self._state = state |
| self._content_profile = content_profile |
| self._format_profile = format_profile |
| self._success_profile = success_profile |
| self._error_profile = error_profile |
| self._stream = stream |
| self._jobs = OrderedDict() |
| self._last_lines = 0 # Number of status lines we last printed to console |
| self._spacing = 1 |
| self._header = _StatusHeader( |
| context, state, content_profile, format_profile, success_profile, error_profile, stream |
| ) |
| |
| self._term_width, _ = shutil.get_terminal_size() |
| self._alloc_lines = 0 |
| self._alloc_columns = None |
| self._need_alloc = True |
| self._term_caps = self._init_terminal() |
| |
| state.register_task_added_callback(self._add_job) |
| state.register_task_removed_callback(self._remove_job) |
| state.register_task_changed_callback(self._job_changed) |
| |
| # clear() |
| # |
| # Clear the status area, it is necessary to call |
| # this before printing anything to the console if |
| # a status area is in use. |
| # |
| # To print some logging to the output and then restore |
| # the status, use the following: |
| # |
| # status.clear() |
| # ... print something to console ... |
| # status.render() |
| # |
| def clear(self): |
| |
| if not self._term_caps: |
| return |
| |
| for _ in range(self._last_lines): |
| self._move_up() |
| self._clear_line() |
| self._last_lines = 0 |
| |
| # render() |
| # |
| # Render the status area. |
| # |
| # If you are not printing a line in addition to rendering |
| # the status area, for instance in a timeout, then it is |
| # not necessary to call clear(). |
| def render(self): |
| |
| if not self._term_caps: |
| return |
| |
| elapsed = self._state.elapsed_time() |
| |
| self.clear() |
| self._check_term_width() |
| self._allocate() |
| |
| # Nothing to render, early return |
| if self._alloc_lines == 0: |
| return |
| |
| # Before rendering the actual lines, we need to add some line |
| # feeds for the amount of lines we intend to print first, and |
| # move cursor position back to the first line |
| for _ in range(self._alloc_lines + self._header.lines): |
| click.echo("", err=True) |
| for _ in range(self._alloc_lines + self._header.lines): |
| self._move_up() |
| |
| # Render the one line header |
| text = self._header.render(self._term_width, elapsed) |
| click.echo(text, err=True) |
| |
| # Now we have the number of columns, and an allocation for |
| # alignment of each column |
| n_columns = len(self._alloc_columns) |
| for line in self._job_lines(n_columns): |
| text = "" |
| for job in line: |
| column = line.index(job) |
| text += job.render(self._alloc_columns[column] - job.size, elapsed) |
| |
| # Add spacing between columns |
| if column < (n_columns - 1): |
| text += " " * self._spacing |
| |
| # Print the line |
| click.echo(text, err=True) |
| |
| # Track what we printed last, for the next clear |
| self._last_lines = self._alloc_lines + self._header.lines |
| |
| ################################################### |
| # Private Methods # |
| ################################################### |
| |
| # _job_changed() |
| # |
| # Reacts to a specified job being changed |
| # |
| # Args: |
| # task_id (str): The unique identifier of the task |
| # |
| def _job_changed(self, task_id): |
| task = self._state.tasks[task_id] |
| job = self._jobs[task_id] |
| if job.update(task): |
| self._need_alloc = True |
| |
| # _init_terminal() |
| # |
| # Initialize the terminal and return the resolved terminal |
| # capabilities dictionary. |
| # |
| # Returns: |
| # (dict|None): The resolved terminal capabilities dictionary, |
| # or None if the terminal does not support all |
| # of the required capabilities. |
| # |
| def _init_terminal(self): |
| |
| # We need both output streams to be connected to a terminal |
| if not (sys.stdout.isatty() and sys.stderr.isatty()): |
| return None |
| |
| # Initialized terminal, curses might decide it doesnt |
| # support this terminal |
| try: |
| curses.setupterm(os.environ.get("TERM", "dumb")) |
| except curses.error: |
| return None |
| |
| term_caps = {} |
| |
| # Resolve the string capabilities we need for the capability |
| # names we need. |
| # |
| for capname, capval in self._TERM_CAPABILITIES.items(): |
| code = curses.tigetstr(capval) |
| |
| # If any of the required capabilities resolve empty strings or None, |
| # then we don't have the capabilities we need for a status bar on |
| # this terminal. |
| if not code: |
| return None |
| |
| # Decode sequences as latin1, as they are always 8-bit bytes, |
| # so when b'\xff' is returned, this must be decoded to u'\xff'. |
| # |
| # This technique is employed by the python blessings library |
| # as well, and should provide better compatibility with most |
| # terminals. |
| # |
| term_caps[capname] = code.decode("latin1") |
| |
| return term_caps |
| |
| def _check_term_width(self): |
| term_width, _ = shutil.get_terminal_size() |
| if self._term_width != term_width: |
| self._term_width = term_width |
| self._need_alloc = True |
| |
| def _move_up(self): |
| assert self._term_caps is not None |
| |
| # Explicitly move to beginning of line, fixes things up |
| # when there was a ^C or ^Z printed to the terminal. |
| move_x = curses.tparm(self._term_caps["move_x"].encode("latin1"), 0) |
| move_x = move_x.decode("latin1") |
| |
| move_up = curses.tparm(self._term_caps["move_up"].encode("latin1")) |
| move_up = move_up.decode("latin1") |
| |
| click.echo(move_x + move_up, nl=False, err=True) |
| |
| def _clear_line(self): |
| assert self._term_caps is not None |
| |
| clear_eol = curses.tparm(self._term_caps["clear_eol"].encode("latin1")) |
| clear_eol = clear_eol.decode("latin1") |
| click.echo(clear_eol, nl=False, err=True) |
| |
| def _allocate(self): |
| if not self._need_alloc: |
| return |
| |
| # State when there is no jobs to display |
| alloc_lines = 0 |
| alloc_columns = [] |
| line_length = 0 |
| |
| # Test for the widest width which fits columnized jobs |
| for columns in reversed(range(len(self._jobs))): |
| alloc_lines, alloc_columns = self._allocate_columns(columns + 1) |
| |
| # If the sum of column widths with spacing in between |
| # fits into the terminal width, this is a good allocation. |
| line_length = sum(alloc_columns) + (columns * self._spacing) |
| if line_length < self._term_width: |
| break |
| |
| self._alloc_lines = alloc_lines |
| self._alloc_columns = alloc_columns |
| self._need_alloc = False |
| |
| def _job_lines(self, columns): |
| jobs_list = list(self._jobs.values()) |
| for i in range(0, len(self._jobs), columns): |
| yield jobs_list[i : i + columns] |
| |
| # Returns an array of integers representing the maximum |
| # length in characters for each column, given the current |
| # list of jobs to render. |
| # |
| def _allocate_columns(self, columns): |
| column_widths = [0 for _ in range(columns)] |
| lines = 0 |
| for line in self._job_lines(columns): |
| line_len = len(line) |
| lines += 1 |
| for col in range(columns): |
| if col < line_len: |
| job = line[col] |
| column_widths[col] = max(column_widths[col], job.size) |
| |
| return lines, column_widths |
| |
| # _add_job() |
| # |
| # Adds a job to track in the status area |
| # |
| # Args: |
| # task_id (str): The unique identifier of the task |
| # |
| def _add_job(self, task_id): |
| task = self._state.tasks[task_id] |
| elapsed = task.elapsed_offset |
| job = _StatusJob( |
| self._context, task.action_name, task.full_name, self._content_profile, self._format_profile, elapsed |
| ) |
| self._jobs[task_id] = job |
| self._need_alloc = True |
| |
| # _remove_job() |
| # |
| # Removes a job currently being tracked in the status area |
| # |
| # Args: |
| # task_id (str): The unique identifier of the task |
| # |
| def _remove_job(self, task_id): |
| del self._jobs[task_id] |
| self._need_alloc = True |
| |
| |
| # _StatusHeader() |
| # |
| # A delegate object for rendering the header part of the Status() widget |
| # |
| # Args: |
| # context (Context): The Context |
| # content_profile (Profile): Formatting profile for content text |
| # format_profile (Profile): Formatting profile for formatting text |
| # success_profile (Profile): Formatting profile for success text |
| # error_profile (Profile): Formatting profile for error text |
| # stream (Stream): The Stream |
| # |
| class _StatusHeader: |
| def __init__(self, context, state, content_profile, format_profile, success_profile, error_profile, stream): |
| |
| # |
| # Public members |
| # |
| self.lines = 3 |
| |
| # |
| # Private members |
| # |
| self._content_profile = content_profile |
| self._format_profile = format_profile |
| self._success_profile = success_profile |
| self._error_profile = error_profile |
| self._stream = stream |
| self._state = state |
| self._time_code = TimeCode(context, content_profile, format_profile) |
| self._context = context |
| |
| def render(self, line_length, elapsed): |
| project = self._context.get_toplevel_project() |
| line_length = max(line_length, 80) |
| |
| # |
| # Line 1: Session time, project name, session / total elements |
| # |
| # ========= 00:00:00 project-name (143/387) ========= |
| # |
| session = str(len(self._stream.session_elements)) |
| total = str(len(self._stream.total_elements)) |
| |
| size = 0 |
| text = "" |
| size += len(total) + len(session) + 4 # Size for (N/N) with a leading space |
| size += 8 # Size of time code |
| size += len(project.name) + 1 |
| text += self._time_code.render_time(elapsed) |
| text += " " + self._content_profile.fmt(project.name) |
| text += ( |
| " " |
| + self._format_profile.fmt("(") |
| + self._content_profile.fmt(session) |
| + self._format_profile.fmt("/") |
| + self._content_profile.fmt(total) |
| + self._format_profile.fmt(")") |
| ) |
| |
| line1 = self._centered(text, size, line_length, "=") |
| |
| # |
| # Line 2: Dynamic list of queue status reports |
| # |
| # (Sources Fetched:0 117 0)→ (Built:4 0 0) |
| # |
| size = 0 |
| text = "" |
| |
| # Format and calculate size for each queue progress |
| for index, task_group in enumerate(self._state.task_groups.values()): |
| |
| # Add spacing |
| if index > 0: |
| size += 2 |
| text += self._format_profile.fmt("→ ") |
| |
| group_text, group_size = self._render_task_group(task_group) |
| size += group_size |
| text += group_text |
| |
| line2 = self._centered(text, size, line_length, " ") |
| |
| # |
| # Line 3: Cache usage percentage report |
| # |
| # ~~~~~~ cache: 44.2G / 64G (69%) ~~~~~~ |
| # |
| cas = self._context.get_cascache() |
| usage = cas.get_cache_usage() |
| usage_string = str(usage) |
| |
| if usage.used_size is None: |
| # Cache usage is unknown |
| size = 0 |
| text = "" |
| else: |
| size = 21 |
| size += len(usage_string) |
| if usage.used_percent >= 95: |
| formatted_usage = self._error_profile.fmt(usage_string) |
| elif usage.used_percent >= 80: |
| formatted_usage = self._content_profile.fmt(usage_string) |
| else: |
| formatted_usage = self._success_profile.fmt(usage_string) |
| |
| text = ( |
| self._format_profile.fmt("~~~~~~ ") |
| + self._content_profile.fmt("cache") |
| + self._format_profile.fmt(": ") |
| + formatted_usage |
| + self._format_profile.fmt(" ~~~~~~") |
| ) |
| |
| line3 = self._centered(text, size, line_length, " ") |
| |
| return line1 + "\n" + line2 + "\n" + line3 |
| |
| ################################################### |
| # Private Methods # |
| ################################################### |
| def _render_task_group(self, group): |
| processed = str(group.processed_tasks) |
| skipped = str(group.skipped_tasks) |
| failed = str(len(group.failed_tasks)) |
| |
| size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']' |
| size += len(group.complete_name) |
| size += len(processed) + len(skipped) + len(failed) |
| text = ( |
| self._format_profile.fmt("(") |
| + self._content_profile.fmt(group.complete_name) |
| + self._format_profile.fmt(":") |
| + self._success_profile.fmt(processed) |
| + " " |
| + self._content_profile.fmt(skipped) |
| + " " |
| + self._error_profile.fmt(failed) |
| + self._format_profile.fmt(")") |
| ) |
| |
| return (text, size) |
| |
| def _centered(self, text, size, line_length, fill): |
| remaining = line_length - size |
| remaining -= 2 |
| |
| final_text = self._format_profile.fmt(fill * (remaining // 2)) + " " |
| final_text += text |
| final_text += " " + self._format_profile.fmt(fill * (remaining // 2)) |
| |
| return final_text |
| |
| |
| # _StatusJob() |
| # |
| # A delegate object for rendering a job in the status area |
| # |
| # Args: |
| # context (Context): The Context |
| # action_name (str): The action performed |
| # full_name (str): The name of the job |
| # content_profile (Profile): Formatting profile for content text |
| # format_profile (Profile): Formatting profile for formatting text |
| # elapsed (datetime): The offset into the session when this job is created |
| # |
| class _StatusJob: |
| def __init__(self, context, action_name, full_name, content_profile, format_profile, elapsed): |
| # |
| # Public members |
| # |
| self.action_name = action_name # The action name |
| self.size = None # The number of characters required to render |
| self.full_name = full_name |
| |
| # |
| # Private members |
| # |
| self._offset = elapsed |
| self._content_profile = content_profile |
| self._format_profile = format_profile |
| self._time_code = TimeCode(context, content_profile, format_profile) |
| self._current_progress = None # Progress tally to render |
| self._maximum_progress = None # Progress tally to render |
| |
| self.size = self.calculate_size() |
| |
| # calculate_size() |
| # |
| # Calculates the amount of space the job takes up when rendered |
| # |
| # Returns: |
| # int: The size of the job when rendered |
| # |
| def calculate_size(self): |
| # Calculate the size needed to display |
| size = 10 # Size of time code with brackets |
| size += len(self.action_name) |
| size += len(self.full_name) |
| size += 3 # '[' + ':' + ']' |
| if self._current_progress is not None: |
| size += len(str(self._current_progress)) |
| size += 1 # ':' |
| if self._maximum_progress is not None: |
| size += len(str(self._maximum_progress)) |
| size += 1 # '/' |
| return size |
| |
| # update() |
| # |
| # Synchronises its internal data with the provided Task, |
| # and returns whether its size has changed |
| # |
| # Args: |
| # task (Task): The task associated with this job |
| # |
| # Returns: |
| # bool: Whether the size of the job has changed |
| # |
| def update(self, task): |
| changed = False |
| size_changed = False |
| if task.current_progress != self._current_progress: |
| changed = True |
| self._current_progress = task.current_progress |
| if task.maximum_progress != self._maximum_progress: |
| changed = True |
| self._maximum_progress = task.maximum_progress |
| if changed: |
| old_size = self.size |
| self.size = self.calculate_size() |
| if self.size != old_size: |
| size_changed = True |
| |
| return size_changed |
| |
| # render() |
| # |
| # Render the Job, return a rendered string |
| # |
| # Args: |
| # padding (int): Amount of padding to print in order to align with columns |
| # elapsed (datetime): The session elapsed time offset |
| # |
| def render(self, padding, elapsed): |
| text = ( |
| self._format_profile.fmt("[") |
| + self._time_code.render_time(elapsed - self._offset) |
| + self._format_profile.fmt("]") |
| ) |
| |
| text += ( |
| self._format_profile.fmt("[") |
| + self._content_profile.fmt(self.action_name) |
| + self._format_profile.fmt(":") |
| + self._content_profile.fmt(self.full_name) |
| ) |
| |
| if self._current_progress is not None: |
| text += self._format_profile.fmt(":") + self._content_profile.fmt(str(self._current_progress)) |
| if self._maximum_progress is not None: |
| text += self._format_profile.fmt("/") + self._content_profile.fmt(str(self._maximum_progress)) |
| |
| # Add padding before terminating ']' |
| terminator = (" " * padding) + "]" |
| text += self._format_profile.fmt(terminator) |
| |
| return text |