blob: fcdc5a5a3fe50554b0822e76bdb9394ef3f593ed [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2016-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
import os
import sys
import resource
from contextlib import contextmanager
from blessings import Terminal
import click
from click import UsageError
from .cli import cli
# Import buildstream public symbols
from .. import Scope
# Import various buildstream internals
from .._context import Context
from .._project import Project
from .._exceptions import BstError, LoadError
from .._message import MessageType, unconditional_messages
from .._pipeline import Pipeline, PipelineError
from .._scheduler import Scheduler
from .._profile import Topics, profile_start, profile_end
from .. import _yaml
from .. import __version__ as build_stream_version
# Import frontend assets
from . import Profile, LogLine, Status
from .complete import main_bashcomplete, complete_path, CompleteUnhandled
# Intendation for all logging
INDENT = 4
##################################################################
# Main Application State #
##################################################################
class App():
def __init__(self, main_options):
self.main_options = main_options
self.logger = None
self.status = None
self.target = None
# Main asset handles
self.context = None
self.project = None
self.scheduler = None
self.pipeline = None
# Failure messages, hashed by unique plugin id
self.fail_messages = {}
# UI Colors Profiles
self.content_profile = Profile(fg='yellow')
self.format_profile = Profile(fg='cyan', dim=True)
self.success_profile = Profile(fg='green')
self.error_profile = Profile(fg='red', dim=True)
self.detail_profile = Profile(dim=True)
# Check if we are connected to a tty
self.is_a_tty = Terminal().is_a_tty
# Figure out interactive mode
if self.main_options['no_interactive']:
self.interactive = False
else:
self.interactive = self.is_a_tty
# Whether we handle failures interactively
# defaults to whether we are interactive or not.
self.interactive_failures = self.interactive
# Resolve whether to use colors in output
if self.main_options['colors'] is None:
self.colors = self.is_a_tty
elif self.main_options['colors']:
self.colors = True
else:
self.colors = False
# Increase the soft limit for open file descriptors to the maximum.
# SafeHardlinks FUSE needs to hold file descriptors for all processes in the sandbox.
# Avoid hitting the limit too quickly.
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if limits[0] != limits[1]:
# Set soft limit to hard limit
resource.setrlimit(resource.RLIMIT_NOFILE, (limits[1], limits[1]))
#
# Initialize the main pipeline
#
def initialize(self, elements, except_=tuple(), rewritable=False,
use_configured_remote_caches=False, add_remote_cache=None,
track_elements=None, fetch_subprojects=False):
profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
directory = self.main_options['directory']
config = self.main_options['config']
try:
self.context = Context(fetch_subprojects=fetch_subprojects)
self.context.load(config)
except BstError as e:
click.echo("Error loading user configuration: {}".format(e), err=True)
sys.exit(-1)
# Override things in the context from our command line options,
# the command line when used, trumps the config files.
#
override_map = {
'strict': 'strict_build_plan',
'debug': 'log_debug',
'verbose': 'log_verbose',
'error_lines': 'log_error_lines',
'message_lines': 'log_message_lines',
'on_error': 'sched_error_action',
'fetchers': 'sched_fetchers',
'builders': 'sched_builders',
'pushers': 'sched_pushers',
'network_retries': 'sched_network_retries'
}
for cli_option, context_attr in override_map.items():
option_value = self.main_options.get(cli_option)
if option_value is not None:
setattr(self.context, context_attr, option_value)
# Disable interactive failures if --on-error was specified
# on the command line, but not if it was only specified
# in the config.
if self.main_options.get('on_error') is not None:
self.interactive_failures = False
# Create the application's scheduler
self.scheduler = Scheduler(self.context,
interrupt_callback=self.interrupt_handler,
ticker_callback=self.tick,
job_start_callback=self.job_started,
job_complete_callback=self.job_completed)
# Create the logger right before setting the message handler
self.logger = LogLine(
self.content_profile,
self.format_profile,
self.success_profile,
self.error_profile,
self.detail_profile,
# Indentation for detailed messages
indent=INDENT,
# Number of last lines in an element's log to print (when encountering errors)
log_lines=self.context.log_error_lines,
# Maximum number of lines to print in a detailed message
message_lines=self.context.log_message_lines,
# Whether to print additional debugging information
debug=self.context.log_debug,
message_format=self.context.log_message_format)
# Propagate pipeline feedback to the user
self.context._set_message_handler(self.message_handler)
try:
self.project = Project(directory, self.context, cli_options=self.main_options['option'])
except BstError as e:
click.echo("Error loading project: {}".format(e), err=True)
sys.exit(-1)
try:
self.pipeline = Pipeline(self.context, self.project, elements, except_,
rewritable=rewritable)
except BstError as e:
click.echo("Error loading pipeline: {}".format(e), err=True)
sys.exit(-1)
# Create our status printer, only available in interactive
self.status = Status(self.content_profile, self.format_profile,
self.success_profile, self.error_profile,
self.pipeline, self.scheduler,
colors=self.colors)
# Initialize pipeline
try:
self.pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
add_remote_cache=add_remote_cache,
track_elements=track_elements)
except BstError as e:
click.echo("Error initializing pipeline: {}".format(e), err=True)
sys.exit(-1)
# Pipeline is loaded, lets start displaying pipeline messages from tasks
self.logger.size_request(self.pipeline)
profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
#
# Render the status area, conditional on some internal state
#
def maybe_render_status(self):
# If we're suspended or terminating, then dont render the status area
if self.status and self.scheduler and \
not (self.scheduler.suspended or self.scheduler.terminated):
self.status.render()
#
# Handle ^C SIGINT interruptions in the scheduling main loop
#
def interrupt_handler(self):
# Only handle ^C interactively in interactive mode
if not self.interactive:
self.status.clear()
self.scheduler.terminate_jobs()
return
# Here we can give the user some choices, like whether they would
# like to continue, abort immediately, or only complete processing of
# the currently ongoing tasks. We can also print something more
# intelligent, like how many tasks remain to complete overall.
with self.interrupted():
click.echo("\nUser interrupted with ^C\n" +
"\n"
"Choose one of the following options:\n" +
" (c)ontinue - Continue queueing jobs as much as possible\n" +
" (q)uit - Exit after all ongoing jobs complete\n" +
" (t)erminate - Terminate any ongoing jobs and exit\n" +
"\n" +
"Pressing ^C again will terminate jobs and exit\n",
err=True)
try:
choice = click.prompt("Choice:",
value_proc=prefix_choice_value_proc(['continue', 'quit', 'terminate']),
default='continue', err=True)
except click.Abort:
# Ensure a newline after automatically printed '^C'
click.echo("", err=True)
choice = 'terminate'
if choice == 'terminate':
click.echo("\nTerminating all jobs at user request\n", err=True)
self.scheduler.terminate_jobs()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
self.scheduler.stop_queueing()
elif choice == 'continue':
click.echo("\nContinuing\n", err=True)
def job_started(self, element, action_name):
self.status.add_job(element, action_name)
self.maybe_render_status()
def job_completed(self, element, queue, action_name, success):
self.status.remove_job(element, action_name)
self.maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not success and not self.scheduler.terminated:
# Get the last failure message for additional context
failure = self.fail_messages.get(element._get_unique_id())
# XXX This is dangerous, sometimes we get the job completed *before*
# the failure message reaches us ??
if not failure:
self.status.clear()
click.echo("\n\n\nBUG: Message handling out of sync, " +
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(element), err=True)
else:
self.handle_failure(element, queue, failure)
def handle_failure(self, element, queue, failure):
# Handle non interactive mode setting of what to do when a job fails.
if not self.interactive_failures:
if self.context.sched_error_action == 'terminate':
self.scheduler.terminate_jobs()
elif self.context.sched_error_action == 'quit':
self.scheduler.stop_queueing()
elif self.context.sched_error_action == 'continue':
pass
return
# Interactive mode for element failures
with self.interrupted():
summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
"\n" +
"Choose one of the following options:\n" +
" (c)ontinue - Continue queueing jobs as much as possible\n" +
" (q)uit - Exit after all ongoing jobs complete\n" +
" (t)erminate - Terminate any ongoing jobs and exit\n" +
" (r)etry - Retry this job\n")
if failure.logfile:
summary += " (l)og - View the full log file\n"
if failure.sandbox:
summary += " (s)hell - Drop into a shell in the failed build sandbox\n"
summary += "\nPressing ^C will terminate jobs and exit\n"
choices = ['continue', 'quit', 'terminate', 'retry']
if failure.logfile:
choices += ['log']
if failure.sandbox:
choices += ['shell']
choice = ''
while choice not in ['continue', 'quit', 'terminate', 'retry']:
click.echo(summary, err=True)
try:
choice = click.prompt("Choice:", default='continue', err=True,
value_proc=prefix_choice_value_proc(choices))
except click.Abort:
# Ensure a newline after automatically printed '^C'
click.echo("", err=True)
choice = 'terminate'
# Handle choices which you can come back from
#
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
self.shell(element, Scope.BUILD, failure.sandbox, isolate=True)
except BstError as e:
click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
elif choice == 'log':
with open(failure.logfile, 'r') as logfile:
content = logfile.read()
click.echo_via_pager(content)
if choice == 'terminate':
click.echo("\nTerminating all jobs\n", err=True)
self.scheduler.terminate_jobs()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
self.scheduler.stop_queueing()
elif choice == 'continue':
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
queue.failed_elements.remove(element)
queue.enqueue([element])
def shell(self, element, scope, directory, *, mounts=None, isolate=False, command=None):
_, key, dim = element._get_full_display_key()
element_name = element._get_full_name()
if self.colors:
prompt = self.format_profile.fmt('[') + \
self.content_profile.fmt(key, dim=dim) + \
self.format_profile.fmt('@') + \
self.content_profile.fmt(element_name) + \
self.format_profile.fmt(':') + \
self.content_profile.fmt('$PWD') + \
self.format_profile.fmt(']$') + ' '
else:
prompt = '[{}@{}:${{PWD}}]$ '.format(key, element_name)
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
def tick(self, elapsed):
self.maybe_render_status()
#
# Prints the application startup heading, used for commands which
# will process a pipeline.
#
def print_heading(self, deps=None):
self.logger.print_heading(self.pipeline,
self.main_options['log_file'],
styling=self.colors,
deps=deps)
#
# Print a summary of the queues
#
def print_summary(self):
click.echo("", err=True)
self.logger.print_summary(self.pipeline, self.scheduler,
self.main_options['log_file'],
styling=self.colors)
#
# Print an error
#
def print_error(self, error):
click.echo("", err=True)
click.echo("{}".format(error), err=True)
if error.detail:
indent = " " * INDENT
detail = '\n' + indent + indent.join(error.detail.splitlines(True))
click.echo("{}".format(detail), err=True)
#
# Handle messages from the pipeline
#
def message_handler(self, message, context):
# Drop status messages from the UI if not verbose, we'll still see
# info messages and status messages will still go to the log files.
if not context.log_verbose and message.message_type == MessageType.STATUS:
return
# Hold on to the failure messages
if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.unique_id is not None:
self.fail_messages[message.unique_id] = message
# Send to frontend if appropriate
if self.context._silent_messages() and (message.message_type not in unconditional_messages):
return
if self.status:
self.status.clear()
text = self.logger.render(message)
click.echo(text, color=self.colors, nl=False, err=True)
# Maybe render the status area
self.maybe_render_status()
# Additionally log to a file
if self.main_options['log_file']:
click.echo(text, file=self.main_options['log_file'], color=False, nl=False)
@contextmanager
def interrupted(self):
self.scheduler.disconnect_signals()
self.status.clear()
self.scheduler.suspend_jobs()
yield
self.maybe_render_status()
self.scheduler.resume_jobs()
self.scheduler.connect_signals()
def cleanup(self):
if self.pipeline:
self.pipeline.cleanup()
#
# Return a value processor for partial choice matching.
# The returned values processor will test the passed value with all the item
# in the 'choices' list. If the value is a prefix of one of the 'choices'
# element, the element is returned. If no element or several elements match
# the same input, a 'click.UsageError' exception is raised with a description
# of the error.
#
# Note that Click expect user input errors to be signaled by raising a
# 'click.UsageError' exception. That way, Click display an error message and
# ask for a new input.
#
def prefix_choice_value_proc(choices):
def value_proc(user_input):
remaining_candidate = [choice for choice in choices if choice.startswith(user_input)]
if not remaining_candidate:
raise UsageError("Expected one of {}, got {}".format(choices, user_input))
elif len(remaining_candidate) == 1:
return remaining_candidate[0]
else:
raise UsageError("Ambiguous input. '{}' can refer to one of {}".format(user_input, remaining_candidate))
return value_proc