| from __future__ import absolute_import |
| from __future__ import unicode_literals |
| |
| import datetime |
| import logging |
| import operator |
| from functools import reduce |
| |
| import enum |
| from docker.errors import APIError |
| |
| from . import parallel |
| from .config import ConfigurationError |
| from .config.config import V1 |
| from .config.sort_services import get_container_name_from_network_mode |
| from .config.sort_services import get_service_name_from_network_mode |
| from .const import DEFAULT_TIMEOUT |
| from .const import IMAGE_EVENTS |
| from .const import LABEL_ONE_OFF |
| from .const import LABEL_PROJECT |
| from .const import LABEL_SERVICE |
| from .container import Container |
| from .network import build_networks |
| from .network import get_networks |
| from .network import ProjectNetworks |
| from .service import BuildAction |
| from .service import ContainerNetworkMode |
| from .service import ConvergenceStrategy |
| from .service import NetworkMode |
| from .service import Service |
| from .service import ServiceNetworkMode |
| from .utils import microseconds_from_time_nano |
| from .volume import ProjectVolumes |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| @enum.unique |
| class OneOffFilter(enum.Enum): |
| include = 0 |
| exclude = 1 |
| only = 2 |
| |
| @classmethod |
| def update_labels(cls, value, labels): |
| if value == cls.only: |
| labels.append('{0}={1}'.format(LABEL_ONE_OFF, "True")) |
| elif value == cls.exclude: |
| labels.append('{0}={1}'.format(LABEL_ONE_OFF, "False")) |
| elif value == cls.include: |
| pass |
| else: |
| raise ValueError("Invalid value for one_off: {}".format(repr(value))) |
| |
| |
| class Project(object): |
| """ |
| A collection of services. |
| """ |
| def __init__(self, name, services, client, networks=None, volumes=None): |
| self.name = name |
| self.services = services |
| self.client = client |
| self.volumes = volumes or ProjectVolumes({}) |
| self.networks = networks or ProjectNetworks({}, False) |
| |
| def labels(self, one_off=OneOffFilter.exclude): |
| labels = ['{0}={1}'.format(LABEL_PROJECT, self.name)] |
| |
| OneOffFilter.update_labels(one_off, labels) |
| return labels |
| |
| @classmethod |
| def from_config(cls, name, config_data, client): |
| """ |
| Construct a Project from a config.Config object. |
| """ |
| use_networking = (config_data.version and config_data.version != V1) |
| networks = build_networks(name, config_data, client) |
| project_networks = ProjectNetworks.from_services( |
| config_data.services, |
| networks, |
| use_networking) |
| volumes = ProjectVolumes.from_config(name, config_data, client) |
| project = cls(name, [], client, project_networks, volumes) |
| |
| for service_dict in config_data.services: |
| service_dict = dict(service_dict) |
| if use_networking: |
| service_networks = get_networks(service_dict, networks) |
| else: |
| service_networks = {} |
| |
| service_dict.pop('networks', None) |
| links = project.get_links(service_dict) |
| network_mode = project.get_network_mode( |
| service_dict, list(service_networks.keys()) |
| ) |
| volumes_from = get_volumes_from(project, service_dict) |
| |
| if config_data.version != V1: |
| service_dict['volumes'] = [ |
| volumes.namespace_spec(volume_spec) |
| for volume_spec in service_dict.get('volumes', []) |
| ] |
| |
| project.services.append( |
| Service( |
| service_dict.pop('name'), |
| client=client, |
| project=name, |
| use_networking=use_networking, |
| networks=service_networks, |
| links=links, |
| network_mode=network_mode, |
| volumes_from=volumes_from, |
| **service_dict) |
| ) |
| |
| return project |
| |
| @property |
| def service_names(self): |
| return [service.name for service in self.services] |
| |
| def get_service(self, name): |
| """ |
| Retrieve a service by name. Raises NoSuchService |
| if the named service does not exist. |
| """ |
| for service in self.services: |
| if service.name == name: |
| return service |
| |
| raise NoSuchService(name) |
| |
| def validate_service_names(self, service_names): |
| """ |
| Validate that the given list of service names only contains valid |
| services. Raises NoSuchService if one of the names is invalid. |
| """ |
| valid_names = self.service_names |
| for name in service_names: |
| if name not in valid_names: |
| raise NoSuchService(name) |
| |
| def get_services(self, service_names=None, include_deps=False): |
| """ |
| Returns a list of this project's services filtered |
| by the provided list of names, or all services if service_names is None |
| or []. |
| |
| If include_deps is specified, returns a list including the dependencies for |
| service_names, in order of dependency. |
| |
| Preserves the original order of self.services where possible, |
| reordering as needed to resolve dependencies. |
| |
| Raises NoSuchService if any of the named services do not exist. |
| """ |
| if service_names is None or len(service_names) == 0: |
| service_names = self.service_names |
| |
| unsorted = [self.get_service(name) for name in service_names] |
| services = [s for s in self.services if s in unsorted] |
| |
| if include_deps: |
| services = reduce(self._inject_deps, services, []) |
| |
| uniques = [] |
| [uniques.append(s) for s in services if s not in uniques] |
| |
| return uniques |
| |
| def get_services_without_duplicate(self, service_names=None, include_deps=False): |
| services = self.get_services(service_names, include_deps) |
| for service in services: |
| service.remove_duplicate_containers() |
| return services |
| |
| def get_links(self, service_dict): |
| links = [] |
| if 'links' in service_dict: |
| for link in service_dict.get('links', []): |
| if ':' in link: |
| service_name, link_name = link.split(':', 1) |
| else: |
| service_name, link_name = link, None |
| try: |
| links.append((self.get_service(service_name), link_name)) |
| except NoSuchService: |
| raise ConfigurationError( |
| 'Service "%s" has a link to service "%s" which does not ' |
| 'exist.' % (service_dict['name'], service_name)) |
| del service_dict['links'] |
| return links |
| |
| def get_network_mode(self, service_dict, networks): |
| network_mode = service_dict.pop('network_mode', None) |
| if not network_mode: |
| if self.networks.use_networking: |
| return NetworkMode(networks[0]) if networks else NetworkMode('none') |
| return NetworkMode(None) |
| |
| service_name = get_service_name_from_network_mode(network_mode) |
| if service_name: |
| return ServiceNetworkMode(self.get_service(service_name)) |
| |
| container_name = get_container_name_from_network_mode(network_mode) |
| if container_name: |
| try: |
| return ContainerNetworkMode(Container.from_id(self.client, container_name)) |
| except APIError: |
| raise ConfigurationError( |
| "Service '{name}' uses the network stack of container '{dep}' which " |
| "does not exist.".format(name=service_dict['name'], dep=container_name)) |
| |
| return NetworkMode(network_mode) |
| |
| def start(self, service_names=None, **options): |
| containers = [] |
| |
| def start_service(service): |
| service_containers = service.start(quiet=True, **options) |
| containers.extend(service_containers) |
| |
| services = self.get_services(service_names) |
| |
| def get_deps(service): |
| return {self.get_service(dep) for dep in service.get_dependency_names()} |
| |
| parallel.parallel_execute( |
| services, |
| start_service, |
| operator.attrgetter('name'), |
| 'Starting', |
| get_deps) |
| |
| return containers |
| |
| def stop(self, service_names=None, one_off=OneOffFilter.exclude, **options): |
| containers = self.containers(service_names, one_off=one_off) |
| |
| def get_deps(container): |
| # actually returning inversed dependencies |
| return {other for other in containers |
| if container.service in |
| self.get_service(other.service).get_dependency_names()} |
| |
| parallel.parallel_execute( |
| containers, |
| operator.methodcaller('stop', **options), |
| operator.attrgetter('name'), |
| 'Stopping', |
| get_deps) |
| |
| def pause(self, service_names=None, **options): |
| containers = self.containers(service_names) |
| parallel.parallel_pause(reversed(containers), options) |
| return containers |
| |
| def unpause(self, service_names=None, **options): |
| containers = self.containers(service_names) |
| parallel.parallel_unpause(containers, options) |
| return containers |
| |
| def kill(self, service_names=None, **options): |
| parallel.parallel_kill(self.containers(service_names), options) |
| |
| def remove_stopped(self, service_names=None, one_off=OneOffFilter.exclude, **options): |
| parallel.parallel_remove(self.containers( |
| service_names, stopped=True, one_off=one_off |
| ), options) |
| |
| def down(self, remove_image_type, include_volumes, remove_orphans=False): |
| self.stop(one_off=OneOffFilter.include) |
| self.find_orphan_containers(remove_orphans) |
| self.remove_stopped(v=include_volumes, one_off=OneOffFilter.include) |
| |
| self.networks.remove() |
| |
| if include_volumes: |
| self.volumes.remove() |
| |
| self.remove_images(remove_image_type) |
| |
| def remove_images(self, remove_image_type): |
| for service in self.get_services(): |
| service.remove_image(remove_image_type) |
| |
| def restart(self, service_names=None, **options): |
| containers = self.containers(service_names, stopped=True) |
| parallel.parallel_restart(containers, options) |
| return containers |
| |
| def build(self, service_names=None, no_cache=False, pull=False, force_rm=False): |
| for service in self.get_services(service_names): |
| if service.can_be_built(): |
| service.build(no_cache, pull, force_rm) |
| else: |
| log.info('%s uses an image, skipping' % service.name) |
| |
| def create( |
| self, |
| service_names=None, |
| strategy=ConvergenceStrategy.changed, |
| do_build=BuildAction.none, |
| ): |
| services = self.get_services_without_duplicate(service_names, include_deps=True) |
| |
| for svc in services: |
| svc.ensure_image_exists(do_build=do_build) |
| plans = self._get_convergence_plans(services, strategy) |
| |
| for service in services: |
| service.execute_convergence_plan( |
| plans[service.name], |
| detached=True, |
| start=False) |
| |
| def events(self, service_names=None): |
| def build_container_event(event, container): |
| time = datetime.datetime.fromtimestamp(event['time']) |
| time = time.replace( |
| microsecond=microseconds_from_time_nano(event['timeNano'])) |
| return { |
| 'time': time, |
| 'type': 'container', |
| 'action': event['status'], |
| 'id': container.id, |
| 'service': container.service, |
| 'attributes': { |
| 'name': container.name, |
| 'image': event['from'], |
| }, |
| 'container': container, |
| } |
| |
| service_names = set(service_names or self.service_names) |
| for event in self.client.events( |
| filters={'label': self.labels()}, |
| decode=True |
| ): |
| # The first part of this condition is a guard against some events |
| # broadcasted by swarm that don't have a status field. |
| # See https://github.com/docker/compose/issues/3316 |
| if 'status' not in event or event['status'] in IMAGE_EVENTS: |
| # We don't receive any image events because labels aren't applied |
| # to images |
| continue |
| |
| # TODO: get labels from the API v1.22 , see github issue 2618 |
| try: |
| # this can fail if the conatiner has been removed |
| container = Container.from_id(self.client, event['id']) |
| except APIError: |
| continue |
| if container.service not in service_names: |
| continue |
| yield build_container_event(event, container) |
| |
| def up(self, |
| service_names=None, |
| start_deps=True, |
| strategy=ConvergenceStrategy.changed, |
| do_build=BuildAction.none, |
| timeout=DEFAULT_TIMEOUT, |
| detached=False, |
| remove_orphans=False): |
| |
| warn_for_swarm_mode(self.client) |
| |
| self.initialize() |
| self.find_orphan_containers(remove_orphans) |
| |
| services = self.get_services_without_duplicate( |
| service_names, |
| include_deps=start_deps) |
| |
| for svc in services: |
| svc.ensure_image_exists(do_build=do_build) |
| plans = self._get_convergence_plans(services, strategy) |
| |
| def do(service): |
| return service.execute_convergence_plan( |
| plans[service.name], |
| timeout=timeout, |
| detached=detached |
| ) |
| |
| def get_deps(service): |
| return {self.get_service(dep) for dep in service.get_dependency_names()} |
| |
| results, errors = parallel.parallel_execute( |
| services, |
| do, |
| operator.attrgetter('name'), |
| None, |
| get_deps |
| ) |
| if errors: |
| raise ProjectError( |
| 'Encountered errors while bringing up the project.' |
| ) |
| |
| return [ |
| container |
| for svc_containers in results |
| if svc_containers is not None |
| for container in svc_containers |
| ] |
| |
| def initialize(self): |
| self.networks.initialize() |
| self.volumes.initialize() |
| |
| def _get_convergence_plans(self, services, strategy): |
| plans = {} |
| |
| for service in services: |
| updated_dependencies = [ |
| name |
| for name in service.get_dependency_names() |
| if name in plans and |
| plans[name].action in ('recreate', 'create') |
| ] |
| |
| if updated_dependencies and strategy.allows_recreate: |
| log.debug('%s has upstream changes (%s)', |
| service.name, |
| ", ".join(updated_dependencies)) |
| plan = service.convergence_plan(ConvergenceStrategy.always) |
| else: |
| plan = service.convergence_plan(strategy) |
| |
| plans[service.name] = plan |
| |
| return plans |
| |
| def pull(self, service_names=None, ignore_pull_failures=False): |
| for service in self.get_services(service_names, include_deps=False): |
| service.pull(ignore_pull_failures) |
| |
| def push(self, service_names=None, ignore_push_failures=False): |
| for service in self.get_services(service_names, include_deps=False): |
| service.push(ignore_push_failures) |
| |
| def _labeled_containers(self, stopped=False, one_off=OneOffFilter.exclude): |
| return list(filter(None, [ |
| Container.from_ps(self.client, container) |
| for container in self.client.containers( |
| all=stopped, |
| filters={'label': self.labels(one_off=one_off)})]) |
| ) |
| |
| def containers(self, service_names=None, stopped=False, one_off=OneOffFilter.exclude): |
| if service_names: |
| self.validate_service_names(service_names) |
| else: |
| service_names = self.service_names |
| |
| containers = self._labeled_containers(stopped, one_off) |
| |
| def matches_service_names(container): |
| return container.labels.get(LABEL_SERVICE) in service_names |
| |
| return [c for c in containers if matches_service_names(c)] |
| |
| def find_orphan_containers(self, remove_orphans): |
| def _find(): |
| containers = self._labeled_containers() |
| for ctnr in containers: |
| service_name = ctnr.labels.get(LABEL_SERVICE) |
| if service_name not in self.service_names: |
| yield ctnr |
| orphans = list(_find()) |
| if not orphans: |
| return |
| if remove_orphans: |
| for ctnr in orphans: |
| log.info('Removing orphan container "{0}"'.format(ctnr.name)) |
| ctnr.kill() |
| ctnr.remove(force=True) |
| else: |
| log.warning( |
| 'Found orphan containers ({0}) for this project. If ' |
| 'you removed or renamed this service in your compose ' |
| 'file, you can run this command with the ' |
| '--remove-orphans flag to clean it up.'.format( |
| ', '.join(["{}".format(ctnr.name) for ctnr in orphans]) |
| ) |
| ) |
| |
| def _inject_deps(self, acc, service): |
| dep_names = service.get_dependency_names() |
| |
| if len(dep_names) > 0: |
| dep_services = self.get_services( |
| service_names=list(set(dep_names)), |
| include_deps=True |
| ) |
| else: |
| dep_services = [] |
| |
| dep_services.append(service) |
| return acc + dep_services |
| |
| |
| def get_volumes_from(project, service_dict): |
| volumes_from = service_dict.pop('volumes_from', None) |
| if not volumes_from: |
| return [] |
| |
| def build_volume_from(spec): |
| if spec.type == 'service': |
| try: |
| return spec._replace(source=project.get_service(spec.source)) |
| except NoSuchService: |
| pass |
| |
| if spec.type == 'container': |
| try: |
| container = Container.from_id(project.client, spec.source) |
| return spec._replace(source=container) |
| except APIError: |
| pass |
| |
| raise ConfigurationError( |
| "Service \"{}\" mounts volumes from \"{}\", which is not the name " |
| "of a service or container.".format( |
| service_dict['name'], |
| spec.source)) |
| |
| return [build_volume_from(vf) for vf in volumes_from] |
| |
| |
| def warn_for_swarm_mode(client): |
| info = client.info() |
| if info.get('Swarm', {}).get('LocalNodeState') == 'active': |
| log.warn( |
| "The Docker Engine you're using is running in swarm mode.\n\n" |
| "Compose does not use swarm mode to deploy services to multiple nodes in a swarm. " |
| "All containers will be scheduled on the current node.\n\n" |
| "To deploy your application across the swarm, " |
| "use the bundle feature of the Docker experimental build.\n\n" |
| "More info:\n" |
| "https://docs.docker.com/compose/bundles\n" |
| ) |
| |
| |
| class NoSuchService(Exception): |
| def __init__(self, name): |
| self.name = name |
| self.msg = "No such service: %s" % self.name |
| |
| def __str__(self): |
| return self.msg |
| |
| |
| class ProjectError(Exception): |
| def __init__(self, msg): |
| self.msg = msg |