blob: b48462ff57acded75f77dde81409be1e2d3d118e [file] [log] [blame]
from __future__ import absolute_import
from __future__ import unicode_literals
import sys
from collections import namedtuple
from itertools import cycle
from threading import Thread
from six.moves import _thread as thread
from six.moves.queue import Empty
from six.moves.queue import Queue
from . import colors
from compose import utils
from compose.cli.signals import ShutdownException
from compose.utils import split_buffer
class LogPresenter(object):
def __init__(self, prefix_width, color_func):
self.prefix_width = prefix_width
self.color_func = color_func
def present(self, container, line):
prefix = container.name_without_project.ljust(self.prefix_width)
return '{prefix} {line}'.format(
prefix=self.color_func(prefix + ' |'),
line=line)
def build_log_presenters(service_names, monochrome):
"""Return an iterable of functions.
Each function can be used to format the logs output of a container.
"""
prefix_width = max_name_width(service_names)
def no_color(text):
return text
for color_func in cycle([no_color] if monochrome else colors.rainbow()):
yield LogPresenter(prefix_width, color_func)
def max_name_width(service_names, max_index_width=3):
"""Calculate the maximum width of container names so we can make the log
prefixes line up like so:
db_1 | Listening
web_1 | Listening
"""
return max(len(name) for name in service_names) + max_index_width
class LogPrinter(object):
"""Print logs from many containers to a single output stream."""
def __init__(self,
containers,
presenters,
event_stream,
output=sys.stdout,
cascade_stop=False,
log_args=None):
self.containers = containers
self.presenters = presenters
self.event_stream = event_stream
self.output = utils.get_output_stream(output)
self.cascade_stop = cascade_stop
self.log_args = log_args or {}
def run(self):
if not self.containers:
return
queue = Queue()
thread_args = queue, self.log_args
thread_map = build_thread_map(self.containers, self.presenters, thread_args)
start_producer_thread((
thread_map,
self.event_stream,
self.presenters,
thread_args))
for line in consume_queue(queue, self.cascade_stop):
remove_stopped_threads(thread_map)
if not line:
if not thread_map:
# There are no running containers left to tail, so exit
return
# We got an empty line because of a timeout, but there are still
# active containers to tail, so continue
continue
self.output.write(line)
self.output.flush()
def remove_stopped_threads(thread_map):
for container_id, tailer_thread in list(thread_map.items()):
if not tailer_thread.is_alive():
thread_map.pop(container_id, None)
def build_thread(container, presenter, queue, log_args):
tailer = Thread(
target=tail_container_logs,
args=(container, presenter, queue, log_args))
tailer.daemon = True
tailer.start()
return tailer
def build_thread_map(initial_containers, presenters, thread_args):
return {
container.id: build_thread(container, next(presenters), *thread_args)
for container in initial_containers
}
class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
@classmethod
def new(cls, item):
return cls(item, None, None)
@classmethod
def exception(cls, exc):
return cls(None, None, exc)
@classmethod
def stop(cls):
return cls(None, True, None)
def tail_container_logs(container, presenter, queue, log_args):
generator = get_log_generator(container)
try:
for item in generator(container, log_args):
queue.put(QueueItem.new(presenter.present(container, item)))
except Exception as e:
queue.put(QueueItem.exception(e))
return
if log_args.get('follow'):
queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
queue.put(QueueItem.stop())
def get_log_generator(container):
if container.has_api_logs:
return build_log_generator
return build_no_log_generator
def build_no_log_generator(container, log_args):
"""Return a generator that prints a warning about logs and waits for
container to exit.
"""
yield "WARNING: no logs are available with the '{}' log driver\n".format(
container.log_driver)
def build_log_generator(container, log_args):
# if the container doesn't have a log_stream we need to attach to container
# before log printer starts running
if container.log_stream is None:
stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
else:
stream = container.log_stream
return split_buffer(stream)
def wait_on_exit(container):
exit_code = container.wait()
return "%s exited with code %s\n" % (container.name, exit_code)
def start_producer_thread(thread_args):
producer = Thread(target=watch_events, args=thread_args)
producer.daemon = True
producer.start()
def watch_events(thread_map, event_stream, presenters, thread_args):
for event in event_stream:
if event['action'] == 'stop':
thread_map.pop(event['id'], None)
if event['action'] != 'start':
continue
if event['id'] in thread_map:
if thread_map[event['id']].is_alive():
continue
# Container was stopped and started, we need a new thread
thread_map.pop(event['id'], None)
thread_map[event['id']] = build_thread(
event['container'],
next(presenters),
*thread_args)
def consume_queue(queue, cascade_stop):
"""Consume the queue by reading lines off of it and yielding them."""
while True:
try:
item = queue.get(timeout=0.1)
except Empty:
yield None
continue
# See https://github.com/docker/compose/issues/189
except thread.error:
raise ShutdownException()
if item.exc:
raise item.exc
if item.is_stop:
if cascade_stop:
raise StopIteration
else:
continue
yield item.item