| package container |
| |
| import ( |
| "bufio" |
| "bytes" |
| "encoding/binary" |
| "fmt" |
| "io" |
| "os" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/docker/docker/api/types" |
| "github.com/docker/docker/api/types/events" |
| executorpkg "github.com/docker/docker/daemon/cluster/executor" |
| "github.com/docker/go-connections/nat" |
| "github.com/docker/libnetwork" |
| "github.com/docker/swarmkit/agent/exec" |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/log" |
| "github.com/docker/swarmkit/protobuf/ptypes" |
| "github.com/pkg/errors" |
| "golang.org/x/net/context" |
| "golang.org/x/time/rate" |
| ) |
| |
| // controller implements agent.Controller against docker's API. |
| // |
| // Most operations against docker's API are done through the container name, |
| // which is unique to the task. |
| type controller struct { |
| task *api.Task |
| adapter *containerAdapter |
| closed chan struct{} |
| err error |
| |
| pulled chan struct{} // closed after pull |
| cancelPull func() // cancels pull context if not nil |
| pullErr error // pull error, only read after pulled closed |
| } |
| |
| var _ exec.Controller = &controller{} |
| |
| // NewController returns a docker exec runner for the provided task. |
| func newController(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*controller, error) { |
| adapter, err := newContainerAdapter(b, task, secrets) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &controller{ |
| task: task, |
| adapter: adapter, |
| closed: make(chan struct{}), |
| }, nil |
| } |
| |
| func (r *controller) Task() (*api.Task, error) { |
| return r.task, nil |
| } |
| |
| // ContainerStatus returns the container-specific status for the task. |
| func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) { |
| ctnr, err := r.adapter.inspect(ctx) |
| if err != nil { |
| if isUnknownContainer(err) { |
| return nil, nil |
| } |
| return nil, err |
| } |
| return parseContainerStatus(ctnr) |
| } |
| |
| func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) { |
| ctnr, err := r.adapter.inspect(ctx) |
| if err != nil { |
| if isUnknownContainer(err) { |
| return nil, nil |
| } |
| |
| return nil, err |
| } |
| |
| return parsePortStatus(ctnr) |
| } |
| |
| // Update tasks a recent task update and applies it to the container. |
| func (r *controller) Update(ctx context.Context, t *api.Task) error { |
| // TODO(stevvooe): While assignment of tasks is idempotent, we do allow |
| // updates of metadata, such as labelling, as well as any other properties |
| // that make sense. |
| return nil |
| } |
| |
| // Prepare creates a container and ensures the image is pulled. |
| // |
| // If the container has already be created, exec.ErrTaskPrepared is returned. |
| func (r *controller) Prepare(ctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| // Make sure all the networks that the task needs are created. |
| if err := r.adapter.createNetworks(ctx); err != nil { |
| return err |
| } |
| |
| // Make sure all the volumes that the task needs are created. |
| if err := r.adapter.createVolumes(ctx); err != nil { |
| return err |
| } |
| |
| if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { |
| if r.pulled == nil { |
| // Fork the pull to a different context to allow pull to continue |
| // on re-entrant calls to Prepare. This ensures that Prepare can be |
| // idempotent and not incur the extra cost of pulling when |
| // cancelled on updates. |
| var pctx context.Context |
| |
| r.pulled = make(chan struct{}) |
| pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller. |
| |
| go func() { |
| defer close(r.pulled) |
| r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled |
| }() |
| } |
| |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-r.pulled: |
| if r.pullErr != nil { |
| // NOTE(stevvooe): We always try to pull the image to make sure we have |
| // the most up to date version. This will return an error, but we only |
| // log it. If the image truly doesn't exist, the create below will |
| // error out. |
| // |
| // This gives us some nice behavior where we use up to date versions of |
| // mutable tags, but will still run if the old image is available but a |
| // registry is down. |
| // |
| // If you don't want this behavior, lock down your image to an |
| // immutable tag or digest. |
| log.G(ctx).WithError(r.pullErr).Error("pulling image failed") |
| } |
| } |
| } |
| |
| if err := r.adapter.create(ctx); err != nil { |
| if isContainerCreateNameConflict(err) { |
| if _, err := r.adapter.inspect(ctx); err != nil { |
| return err |
| } |
| |
| // container is already created. success! |
| return exec.ErrTaskPrepared |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Start the container. An error will be returned if the container is already started. |
| func (r *controller) Start(ctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| ctnr, err := r.adapter.inspect(ctx) |
| if err != nil { |
| return err |
| } |
| |
| // Detect whether the container has *ever* been started. If so, we don't |
| // issue the start. |
| // |
| // TODO(stevvooe): This is very racy. While reading inspect, another could |
| // start the process and we could end up starting it twice. |
| if ctnr.State.Status != "created" { |
| return exec.ErrTaskStarted |
| } |
| |
| for { |
| if err := r.adapter.start(ctx); err != nil { |
| if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok { |
| // Retry network creation again if we |
| // failed because some of the networks |
| // were not found. |
| if err := r.adapter.createNetworks(ctx); err != nil { |
| return err |
| } |
| |
| continue |
| } |
| |
| return errors.Wrap(err, "starting container failed") |
| } |
| |
| break |
| } |
| |
| // no health check |
| if ctnr.Config == nil || ctnr.Config.Healthcheck == nil { |
| if err := r.adapter.activateServiceBinding(); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name()) |
| return err |
| } |
| return nil |
| } |
| |
| healthCmd := ctnr.Config.Healthcheck.Test |
| |
| if len(healthCmd) == 0 || healthCmd[0] == "NONE" { |
| return nil |
| } |
| |
| // wait for container to be healthy |
| eventq := r.adapter.events(ctx) |
| |
| var healthErr error |
| for { |
| select { |
| case event := <-eventq: |
| if !r.matchevent(event) { |
| continue |
| } |
| |
| switch event.Action { |
| case "die": // exit on terminal events |
| ctnr, err := r.adapter.inspect(ctx) |
| if err != nil { |
| return errors.Wrap(err, "die event received") |
| } else if ctnr.State.ExitCode != 0 { |
| return &exitError{code: ctnr.State.ExitCode, cause: healthErr} |
| } |
| |
| return nil |
| case "destroy": |
| // If we get here, something has gone wrong but we want to exit |
| // and report anyways. |
| return ErrContainerDestroyed |
| case "health_status: unhealthy": |
| // in this case, we stop the container and report unhealthy status |
| if err := r.Shutdown(ctx); err != nil { |
| return errors.Wrap(err, "unhealthy container shutdown failed") |
| } |
| // set health check error, and wait for container to fully exit ("die" event) |
| healthErr = ErrContainerUnhealthy |
| case "health_status: healthy": |
| if err := r.adapter.activateServiceBinding(); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name()) |
| return err |
| } |
| return nil |
| } |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-r.closed: |
| return r.err |
| } |
| } |
| } |
| |
| // Wait on the container to exit. |
| func (r *controller) Wait(pctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| ctx, cancel := context.WithCancel(pctx) |
| defer cancel() |
| |
| healthErr := make(chan error, 1) |
| go func() { |
| ectx, cancel := context.WithCancel(ctx) // cancel event context on first event |
| defer cancel() |
| if err := r.checkHealth(ectx); err == ErrContainerUnhealthy { |
| healthErr <- ErrContainerUnhealthy |
| if err := r.Shutdown(ectx); err != nil { |
| log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy") |
| } |
| } |
| }() |
| |
| err := r.adapter.wait(ctx) |
| if ctx.Err() != nil { |
| return ctx.Err() |
| } |
| |
| if err != nil { |
| ee := &exitError{} |
| if ec, ok := err.(exec.ExitCoder); ok { |
| ee.code = ec.ExitCode() |
| } |
| select { |
| case e := <-healthErr: |
| ee.cause = e |
| default: |
| if err.Error() != "" { |
| ee.cause = err |
| } |
| } |
| return ee |
| } |
| |
| return nil |
| } |
| |
| // Shutdown the container cleanly. |
| func (r *controller) Shutdown(ctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| if r.cancelPull != nil { |
| r.cancelPull() |
| } |
| |
| // remove container from service binding |
| if err := r.adapter.deactivateServiceBinding(); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed to deactivate service binding for container %s", r.adapter.container.name()) |
| return err |
| } |
| |
| if err := r.adapter.shutdown(ctx); err != nil { |
| if isUnknownContainer(err) || isStoppedContainer(err) { |
| return nil |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Terminate the container, with force. |
| func (r *controller) Terminate(ctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| if r.cancelPull != nil { |
| r.cancelPull() |
| } |
| |
| if err := r.adapter.terminate(ctx); err != nil { |
| if isUnknownContainer(err) { |
| return nil |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Remove the container and its resources. |
| func (r *controller) Remove(ctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| if r.cancelPull != nil { |
| r.cancelPull() |
| } |
| |
| // It may be necessary to shut down the task before removing it. |
| if err := r.Shutdown(ctx); err != nil { |
| if isUnknownContainer(err) { |
| return nil |
| } |
| // This may fail if the task was already shut down. |
| log.G(ctx).WithError(err).Debug("shutdown failed on removal") |
| } |
| |
| // Try removing networks referenced in this task in case this |
| // task is the last one referencing it |
| if err := r.adapter.removeNetworks(ctx); err != nil { |
| if isUnknownContainer(err) { |
| return nil |
| } |
| return err |
| } |
| |
| if err := r.adapter.remove(ctx); err != nil { |
| if isUnknownContainer(err) { |
| return nil |
| } |
| |
| return err |
| } |
| return nil |
| } |
| |
| // waitReady waits for a container to be "ready". |
| // Ready means it's past the started state. |
| func (r *controller) waitReady(pctx context.Context) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| ctx, cancel := context.WithCancel(pctx) |
| defer cancel() |
| |
| eventq := r.adapter.events(ctx) |
| |
| ctnr, err := r.adapter.inspect(ctx) |
| if err != nil { |
| if !isUnknownContainer(err) { |
| return errors.Wrap(err, "inspect container failed") |
| } |
| } else { |
| switch ctnr.State.Status { |
| case "running", "exited", "dead": |
| return nil |
| } |
| } |
| |
| for { |
| select { |
| case event := <-eventq: |
| if !r.matchevent(event) { |
| continue |
| } |
| |
| switch event.Action { |
| case "start": |
| return nil |
| } |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-r.closed: |
| return r.err |
| } |
| } |
| } |
| |
| func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error { |
| if err := r.checkClosed(); err != nil { |
| return err |
| } |
| |
| if err := r.waitReady(ctx); err != nil { |
| return errors.Wrap(err, "container not ready for logs") |
| } |
| |
| rc, err := r.adapter.logs(ctx, options) |
| if err != nil { |
| return errors.Wrap(err, "failed getting container logs") |
| } |
| defer rc.Close() |
| |
| var ( |
| // use a rate limiter to keep things under control but also provides some |
| // ability coalesce messages. |
| limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s |
| msgctx = api.LogContext{ |
| NodeID: r.task.NodeID, |
| ServiceID: r.task.ServiceID, |
| TaskID: r.task.ID, |
| } |
| ) |
| |
| brd := bufio.NewReader(rc) |
| for { |
| // so, message header is 8 bytes, treat as uint64, pull stream off MSB |
| var header uint64 |
| if err := binary.Read(brd, binary.BigEndian, &header); err != nil { |
| if err == io.EOF { |
| return nil |
| } |
| |
| return errors.Wrap(err, "failed reading log header") |
| } |
| |
| stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3)) |
| |
| // limit here to decrease allocation back pressure. |
| if err := limiter.WaitN(ctx, int(size)); err != nil { |
| return errors.Wrap(err, "failed rate limiter") |
| } |
| |
| buf := make([]byte, size) |
| _, err := io.ReadFull(brd, buf) |
| if err != nil { |
| return errors.Wrap(err, "failed reading buffer") |
| } |
| |
| // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish |
| parts := bytes.SplitN(buf, []byte(" "), 2) |
| if len(parts) != 2 { |
| return fmt.Errorf("invalid timestamp in log message: %v", buf) |
| } |
| |
| ts, err := time.Parse(time.RFC3339Nano, string(parts[0])) |
| if err != nil { |
| return errors.Wrap(err, "failed to parse timestamp") |
| } |
| |
| tsp, err := ptypes.TimestampProto(ts) |
| if err != nil { |
| return errors.Wrap(err, "failed to convert timestamp") |
| } |
| |
| if err := publisher.Publish(ctx, api.LogMessage{ |
| Context: msgctx, |
| Timestamp: tsp, |
| Stream: api.LogStream(stream), |
| |
| Data: parts[1], |
| }); err != nil { |
| return errors.Wrap(err, "failed to publish log message") |
| } |
| } |
| } |
| |
| // Close the runner and clean up any ephemeral resources. |
| func (r *controller) Close() error { |
| select { |
| case <-r.closed: |
| return r.err |
| default: |
| if r.cancelPull != nil { |
| r.cancelPull() |
| } |
| |
| r.err = exec.ErrControllerClosed |
| close(r.closed) |
| } |
| return nil |
| } |
| |
| func (r *controller) matchevent(event events.Message) bool { |
| if event.Type != events.ContainerEventType { |
| return false |
| } |
| |
| // TODO(stevvooe): Filter based on ID matching, in addition to name. |
| |
| // Make sure the events are for this container. |
| if event.Actor.Attributes["name"] != r.adapter.container.name() { |
| return false |
| } |
| |
| return true |
| } |
| |
| func (r *controller) checkClosed() error { |
| select { |
| case <-r.closed: |
| return r.err |
| default: |
| return nil |
| } |
| } |
| |
| func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) { |
| status := &api.ContainerStatus{ |
| ContainerID: ctnr.ID, |
| PID: int32(ctnr.State.Pid), |
| ExitCode: int32(ctnr.State.ExitCode), |
| } |
| |
| return status, nil |
| } |
| |
| func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) { |
| status := &api.PortStatus{} |
| |
| if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 { |
| exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports) |
| if err != nil { |
| return nil, err |
| } |
| status.Ports = exposedPorts |
| } |
| |
| return status, nil |
| } |
| |
| func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) { |
| exposedPorts := make([]*api.PortConfig, 0, len(portMap)) |
| |
| for portProtocol, mapping := range portMap { |
| parts := strings.SplitN(string(portProtocol), "/", 2) |
| if len(parts) != 2 { |
| return nil, fmt.Errorf("invalid port mapping: %s", portProtocol) |
| } |
| |
| port, err := strconv.ParseUint(parts[0], 10, 16) |
| if err != nil { |
| return nil, err |
| } |
| |
| protocol := api.ProtocolTCP |
| switch strings.ToLower(parts[1]) { |
| case "tcp": |
| protocol = api.ProtocolTCP |
| case "udp": |
| protocol = api.ProtocolUDP |
| default: |
| return nil, fmt.Errorf("invalid protocol: %s", parts[1]) |
| } |
| |
| for _, binding := range mapping { |
| hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16) |
| if err != nil { |
| return nil, err |
| } |
| |
| // TODO(aluzzardi): We're losing the port `name` here since |
| // there's no way to retrieve it back from the Engine. |
| exposedPorts = append(exposedPorts, &api.PortConfig{ |
| PublishMode: api.PublishModeHost, |
| Protocol: protocol, |
| TargetPort: uint32(port), |
| PublishedPort: uint32(hostPort), |
| }) |
| } |
| } |
| |
| return exposedPorts, nil |
| } |
| |
| type exitError struct { |
| code int |
| cause error |
| } |
| |
| func (e *exitError) Error() string { |
| if e.cause != nil { |
| return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause) |
| } |
| |
| return fmt.Sprintf("task: non-zero exit (%v)", e.code) |
| } |
| |
| func (e *exitError) ExitCode() int { |
| return int(e.code) |
| } |
| |
| func (e *exitError) Cause() error { |
| return e.cause |
| } |
| |
| // checkHealth blocks until unhealthy container is detected or ctx exits |
| func (r *controller) checkHealth(ctx context.Context) error { |
| eventq := r.adapter.events(ctx) |
| |
| for { |
| select { |
| case <-ctx.Done(): |
| return nil |
| case <-r.closed: |
| return nil |
| case event := <-eventq: |
| if !r.matchevent(event) { |
| continue |
| } |
| |
| switch event.Action { |
| case "health_status: unhealthy": |
| return ErrContainerUnhealthy |
| } |
| } |
| } |
| } |