blob: 99c5ae93076af8f86cfadff8f7208efc6623dfce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bufplugindocker
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
)
import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stringid"
"go.uber.org/multierr"
"go.uber.org/zap"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/bufman/bufpkg/bufplugin/bufpluginconfig"
)
const (
// Setting this value on the buf docker client allows us to propagate a custom
// value to the OCI registryv1alpha1. This is a useful property that enables registries
// to differentiate between the buf cli vs other tools like docker cli.
// Note, this does not override the final User-Agent entirely, but instead adds
// the value to the final outgoing User-Agent value in the form: [docker client's UA] UpstreamClient(buf-cli-1.11.0)
//
// Example: User-Agent = [docker/20.10.21 go/go1.18.7 git-commit/3056208 kernel/5.15.49-linuxkit os/linux arch/arm64 UpstreamClient(buf-cli-1.11.0)]
BufUpstreamClientUserAgentPrefix = "buf-cli-"
)
// Client is a small abstraction over a Docker API client, providing the basic APIs we need to build plugins.
// It ensures that we pass the appropriate parameters to build images (i.e. platform 'linux/amd64').
type Client interface {
// Load imports a Docker image into the local Docker Engine.
Load(ctx context.Context, image io.Reader) (*LoadResponse, error)
// Push the Docker image to the remote registryv1alpha1.
Push(ctx context.Context, image string, auth *RegistryAuthConfig) (*PushResponse, error)
// Delete removes the Docker image from local Docker Engine.
Delete(ctx context.Context, image string) (*DeleteResponse, error)
// Tag creates a Docker image tag from an existing image and plugin config.
Tag(ctx context.Context, image string, config *bufpluginconfig.Config) (*TagResponse, error)
// Inspect inspects an image and returns the image id.
Inspect(ctx context.Context, image string) (*InspectResponse, error)
// Close releases any resources used by the underlying Docker client.
Close() error
}
// LoadResponse returns details of a successful load image call.
type LoadResponse struct {
// ImageID specifies the Docker image id in the format <hash_algorithm>:<hash>.
// Example: sha256:65001659f150f085e0b37b697a465a95cbfd885d9315b61960883b9ac588744e
ImageID string
}
// PushResponse is a placeholder for data to be returned from a successful image push call.
type PushResponse struct {
// Digest specifies the Docker image digest in the format <hash_algorithm>:<hash>.
// The digest returned from Client.Push differs from the image id returned in Client.Build.
Digest string
}
// TagResponse returns details of a successful image tag call.
type TagResponse struct {
// Image contains the Docker image name in the local Docker engine including the tag.
// It is created from the bufpluginconfig.Config's Name.IdentityString() and a unique id.
Image string
}
// DeleteResponse is a placeholder for data to be returned from a successful image delete call.
type DeleteResponse struct{}
// InspectResponse returns the image id for a given image.
type InspectResponse struct {
// ImageID contains the Docker image's ID.
ImageID string
}
type dockerAPIClient struct {
cli *client.Client
logger *zap.Logger
lock sync.RWMutex // protects negotiated
negotiated bool
}
var _ Client = (*dockerAPIClient)(nil)
func (d *dockerAPIClient) Load(ctx context.Context, image io.Reader) (_ *LoadResponse, retErr error) {
if err := d.negotiateVersion(ctx); err != nil {
return nil, err
}
response, err := d.cli.ImageLoad(ctx, image, true)
if err != nil {
return nil, err
}
defer func() {
if err := response.Body.Close(); err != nil {
retErr = multierr.Append(retErr, fmt.Errorf("docker load response body close error: %w", err))
}
}()
imageID := ""
responseScanner := bufio.NewScanner(response.Body)
for responseScanner.Scan() {
var jsonMessage jsonmessage.JSONMessage
if err := json.Unmarshal(responseScanner.Bytes(), &jsonMessage); err == nil {
_, loadedImageID, found := strings.Cut(strings.TrimSpace(jsonMessage.Stream), "Loaded image ID: ")
if !found {
continue
}
if !strings.HasPrefix(loadedImageID, "sha256:") {
d.logger.Sugar().Warn("Unsupported image digest", zap.String("imageID", loadedImageID))
continue
}
if err := stringid.ValidateID(strings.TrimPrefix(loadedImageID, "sha256:")); err != nil {
d.logger.Sugar().Warn("Invalid image id", zap.String("imageID", loadedImageID))
continue
}
imageID = loadedImageID
}
}
if err := responseScanner.Err(); err != nil {
return nil, err
}
if imageID == "" {
return nil, fmt.Errorf("failed to determine image ID of loaded image")
}
return &LoadResponse{ImageID: imageID}, nil
}
func (d *dockerAPIClient) Tag(ctx context.Context, image string, config *bufpluginconfig.Config) (*TagResponse, error) {
if err := d.negotiateVersion(ctx); err != nil {
return nil, err
}
buildID := stringid.GenerateRandomID()
imageName := config.Name.IdentityString() + ":" + buildID
if err := d.cli.ImageTag(ctx, image, imageName); err != nil {
return nil, err
}
return &TagResponse{Image: imageName}, nil
}
func (d *dockerAPIClient) Push(ctx context.Context, image string, auth *RegistryAuthConfig) (response *PushResponse, retErr error) {
if err := d.negotiateVersion(ctx); err != nil {
return nil, err
}
registryAuth, err := auth.ToHeader()
if err != nil {
return nil, err
}
pushReader, err := d.cli.ImagePush(ctx, image, types.ImagePushOptions{
RegistryAuth: registryAuth,
})
if err != nil {
return nil, err
}
defer func() {
retErr = multierr.Append(retErr, pushReader.Close())
}()
var imageDigest string
pushScanner := bufio.NewScanner(pushReader)
for pushScanner.Scan() {
d.logger.Sugar().Debug(pushScanner.Text())
var message jsonmessage.JSONMessage
if err := json.Unmarshal([]byte(pushScanner.Text()), &message); err == nil {
if message.Error != nil {
return nil, message.Error
}
if message.Aux != nil {
var pushResult types.PushResult
if err := json.Unmarshal(*message.Aux, &pushResult); err == nil {
imageDigest = pushResult.Digest
}
}
}
}
if err := pushScanner.Err(); err != nil {
return nil, err
}
if len(imageDigest) == 0 {
return nil, fmt.Errorf("failed to determine image digest after push")
}
return &PushResponse{Digest: imageDigest}, nil
}
func (d *dockerAPIClient) Delete(ctx context.Context, image string) (*DeleteResponse, error) {
if err := d.negotiateVersion(ctx); err != nil {
return nil, err
}
_, err := d.cli.ImageRemove(ctx, image, types.ImageRemoveOptions{})
if err != nil {
return nil, err
}
return &DeleteResponse{}, nil
}
func (d *dockerAPIClient) Inspect(ctx context.Context, image string) (*InspectResponse, error) {
if err := d.negotiateVersion(ctx); err != nil {
return nil, err
}
inspect, _, err := d.cli.ImageInspectWithRaw(ctx, image)
if err != nil {
return nil, err
}
return &InspectResponse{ImageID: inspect.ID}, nil
}
func (d *dockerAPIClient) Close() error {
return d.cli.Close()
}
func (d *dockerAPIClient) negotiateVersion(ctx context.Context) error {
d.lock.RLock()
negotiated := d.negotiated
d.lock.RUnlock()
if negotiated {
return nil
}
d.lock.Lock()
defer d.lock.Unlock()
if d.negotiated {
return nil
}
deadline := time.Now().Add(5 * time.Second)
if existingDeadline, ok := ctx.Deadline(); ok {
if existingDeadline.Before(deadline) {
deadline = existingDeadline
}
}
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
ping, err := d.cli.Ping(ctx)
if err != nil {
return err
}
d.cli.NegotiateAPIVersionPing(ping)
d.negotiated = true
return nil
}
// NewClient creates a new Client to use to build Docker plugins.
func NewClient(logger *zap.Logger, cliVersion string, options ...ClientOption) (Client, error) {
if logger == nil {
return nil, errors.New("logger required")
}
opts := &clientOptions{}
for _, option := range options {
option(opts)
}
dockerClientOpts := []client.Opt{
client.FromEnv,
client.WithHTTPHeaders(map[string]string{
"User-Agent": BufUpstreamClientUserAgentPrefix + cliVersion,
}),
}
if len(opts.host) > 0 {
dockerClientOpts = append(dockerClientOpts, client.WithHost(opts.host))
}
if len(opts.version) > 0 {
dockerClientOpts = append(dockerClientOpts, client.WithVersion(opts.version))
}
cli, err := client.NewClientWithOpts(dockerClientOpts...)
if err != nil {
return nil, err
}
return &dockerAPIClient{
cli: cli,
logger: logger,
}, nil
}
type clientOptions struct {
host string
version string
}
// ClientOption defines options for the NewClient call to customize the underlying Docker client.
type ClientOption func(options *clientOptions)
// WithHost allows specifying a Docker engine host to connect to (instead of the default lookup using DOCKER_HOST env var).
// This makes it suitable for use by parallel tests.
func WithHost(host string) ClientOption {
return func(options *clientOptions) {
options.host = host
}
}
// WithVersion allows specifying a Docker API client version instead of using the default version negotiation algorithm.
// This allows tests to implement the Docker engine API using stable URLs.
func WithVersion(version string) ClientOption {
return func(options *clientOptions) {
options.version = version
}
}