| package storage |
| |
| import ( |
| "context" |
| "fmt" |
| "net/http" |
| "path" |
| "time" |
| |
| "github.com/docker/distribution" |
| dcontext "github.com/docker/distribution/context" |
| "github.com/docker/distribution/reference" |
| "github.com/docker/distribution/registry/storage/driver" |
| "github.com/docker/distribution/uuid" |
| "github.com/opencontainers/go-digest" |
| ) |
| |
| // linkPathFunc describes a function that can resolve a link based on the |
| // repository name and digest. |
| type linkPathFunc func(name string, dgst digest.Digest) (string, error) |
| |
| // linkedBlobStore provides a full BlobService that namespaces the blobs to a |
| // given repository. Effectively, it manages the links in a given repository |
| // that grant access to the global blob store. |
| type linkedBlobStore struct { |
| *blobStore |
| registry *registry |
| blobServer distribution.BlobServer |
| blobAccessController distribution.BlobDescriptorService |
| repository distribution.Repository |
| ctx context.Context // only to be used where context can't come through method args |
| deleteEnabled bool |
| resumableDigestEnabled bool |
| |
| // linkPathFns specifies one or more path functions allowing one to |
| // control the repository blob link set to which the blob store |
| // dispatches. This is required because manifest and layer blobs have not |
| // yet been fully merged. At some point, this functionality should be |
| // removed the blob links folder should be merged. The first entry is |
| // treated as the "canonical" link location and will be used for writes. |
| linkPathFns []linkPathFunc |
| |
| // linkDirectoryPathSpec locates the root directories in which one might find links |
| linkDirectoryPathSpec pathSpec |
| } |
| |
| var _ distribution.BlobStore = &linkedBlobStore{} |
| |
| func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { |
| return lbs.blobAccessController.Stat(ctx, dgst) |
| } |
| |
| func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { |
| canonical, err := lbs.Stat(ctx, dgst) // access check |
| if err != nil { |
| return nil, err |
| } |
| |
| return lbs.blobStore.Get(ctx, canonical.Digest) |
| } |
| |
| func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { |
| canonical, err := lbs.Stat(ctx, dgst) // access check |
| if err != nil { |
| return nil, err |
| } |
| |
| return lbs.blobStore.Open(ctx, canonical.Digest) |
| } |
| |
| func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { |
| canonical, err := lbs.Stat(ctx, dgst) // access check |
| if err != nil { |
| return err |
| } |
| |
| if canonical.MediaType != "" { |
| // Set the repository local content type. |
| w.Header().Set("Content-Type", canonical.MediaType) |
| } |
| |
| return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest) |
| } |
| |
| func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { |
| dgst := digest.FromBytes(p) |
| // Place the data in the blob store first. |
| desc, err := lbs.blobStore.Put(ctx, mediaType, p) |
| if err != nil { |
| dcontext.GetLogger(ctx).Errorf("error putting into main store: %v", err) |
| return distribution.Descriptor{}, err |
| } |
| |
| if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil { |
| return distribution.Descriptor{}, err |
| } |
| |
| // TODO(stevvooe): Write out mediatype if incoming differs from what is |
| // returned by Put above. Note that we should allow updates for a given |
| // repository. |
| |
| return desc, lbs.linkBlob(ctx, desc) |
| } |
| |
| type optionFunc func(interface{}) error |
| |
| func (f optionFunc) Apply(v interface{}) error { |
| return f(v) |
| } |
| |
| // WithMountFrom returns a BlobCreateOption which designates that the blob should be |
| // mounted from the given canonical reference. |
| func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { |
| return optionFunc(func(v interface{}) error { |
| opts, ok := v.(*distribution.CreateOptions) |
| if !ok { |
| return fmt.Errorf("unexpected options type: %T", v) |
| } |
| |
| opts.Mount.ShouldMount = true |
| opts.Mount.From = ref |
| |
| return nil |
| }) |
| } |
| |
| // Writer begins a blob write session, returning a handle. |
| func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { |
| dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") |
| |
| var opts distribution.CreateOptions |
| |
| for _, option := range options { |
| err := option.Apply(&opts) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| if opts.Mount.ShouldMount { |
| desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat) |
| if err == nil { |
| // Mount successful, no need to initiate an upload session |
| return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} |
| } |
| } |
| |
| uuid := uuid.Generate().String() |
| startedAt := time.Now().UTC() |
| |
| path, err := pathFor(uploadDataPathSpec{ |
| name: lbs.repository.Named().Name(), |
| id: uuid, |
| }) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| startedAtPath, err := pathFor(uploadStartedAtPathSpec{ |
| name: lbs.repository.Named().Name(), |
| id: uuid, |
| }) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| // Write a startedat file for this upload |
| if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { |
| return nil, err |
| } |
| |
| return lbs.newBlobUpload(ctx, uuid, path, startedAt, false) |
| } |
| |
| func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { |
| dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume") |
| |
| startedAtPath, err := pathFor(uploadStartedAtPathSpec{ |
| name: lbs.repository.Named().Name(), |
| id: id, |
| }) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath) |
| if err != nil { |
| switch err := err.(type) { |
| case driver.PathNotFoundError: |
| return nil, distribution.ErrBlobUploadUnknown |
| default: |
| return nil, err |
| } |
| } |
| |
| startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) |
| if err != nil { |
| return nil, err |
| } |
| |
| path, err := pathFor(uploadDataPathSpec{ |
| name: lbs.repository.Named().Name(), |
| id: id, |
| }) |
| |
| if err != nil { |
| return nil, err |
| } |
| |
| return lbs.newBlobUpload(ctx, id, path, startedAt, true) |
| } |
| |
| func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { |
| if !lbs.deleteEnabled { |
| return distribution.ErrUnsupported |
| } |
| |
| // Ensure the blob is available for deletion |
| _, err := lbs.blobAccessController.Stat(ctx, dgst) |
| if err != nil { |
| return err |
| } |
| |
| err = lbs.blobAccessController.Clear(ctx, dgst) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.Digest) error) error { |
| rootPath, err := pathFor(lbs.linkDirectoryPathSpec) |
| if err != nil { |
| return err |
| } |
| return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error { |
| // exit early if directory... |
| if fileInfo.IsDir() { |
| return nil |
| } |
| filePath := fileInfo.Path() |
| |
| // check if it's a link |
| _, fileName := path.Split(filePath) |
| if fileName != "link" { |
| return nil |
| } |
| |
| // read the digest found in link |
| digest, err := lbs.blobStore.readlink(ctx, filePath) |
| if err != nil { |
| return err |
| } |
| |
| // ensure this conforms to the linkPathFns |
| _, err = lbs.Stat(ctx, digest) |
| if err != nil { |
| // we expect this error to occur so we move on |
| if err == distribution.ErrBlobUnknown { |
| return nil |
| } |
| return err |
| } |
| |
| err = ingestor(digest) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| }) |
| } |
| |
| func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) { |
| var stat distribution.Descriptor |
| if sourceStat == nil { |
| // look up the blob info from the sourceRepo if not already provided |
| repo, err := lbs.registry.Repository(ctx, sourceRepo) |
| if err != nil { |
| return distribution.Descriptor{}, err |
| } |
| stat, err = repo.Blobs(ctx).Stat(ctx, dgst) |
| if err != nil { |
| return distribution.Descriptor{}, err |
| } |
| } else { |
| // use the provided blob info |
| stat = *sourceStat |
| } |
| |
| desc := distribution.Descriptor{ |
| Size: stat.Size, |
| |
| // NOTE(stevvooe): The central blob store firewalls media types from |
| // other users. The caller should look this up and override the value |
| // for the specific repository. |
| MediaType: "application/octet-stream", |
| Digest: dgst, |
| } |
| return desc, lbs.linkBlob(ctx, desc) |
| } |
| |
| // newBlobUpload allocates a new upload controller with the given state. |
| func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time, append bool) (distribution.BlobWriter, error) { |
| fw, err := lbs.driver.Writer(ctx, path, append) |
| if err != nil { |
| return nil, err |
| } |
| |
| bw := &blobWriter{ |
| ctx: ctx, |
| blobStore: lbs, |
| id: uuid, |
| startedAt: startedAt, |
| digester: digest.Canonical.Digester(), |
| fileWriter: fw, |
| driver: lbs.driver, |
| path: path, |
| resumableDigestEnabled: lbs.resumableDigestEnabled, |
| } |
| |
| return bw, nil |
| } |
| |
| // linkBlob links a valid, written blob into the registry under the named |
| // repository for the upload controller. |
| func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error { |
| dgsts := append([]digest.Digest{canonical.Digest}, aliases...) |
| |
| // TODO(stevvooe): Need to write out mediatype for only canonical hash |
| // since we don't care about the aliases. They are generally unused except |
| // for tarsum but those versions don't care about mediatype. |
| |
| // Don't make duplicate links. |
| seenDigests := make(map[digest.Digest]struct{}, len(dgsts)) |
| |
| // only use the first link |
| linkPathFn := lbs.linkPathFns[0] |
| |
| for _, dgst := range dgsts { |
| if _, seen := seenDigests[dgst]; seen { |
| continue |
| } |
| seenDigests[dgst] = struct{}{} |
| |
| blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst) |
| if err != nil { |
| return err |
| } |
| |
| if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| type linkedBlobStatter struct { |
| *blobStore |
| repository distribution.Repository |
| |
| // linkPathFns specifies one or more path functions allowing one to |
| // control the repository blob link set to which the blob store |
| // dispatches. This is required because manifest and layer blobs have not |
| // yet been fully merged. At some point, this functionality should be |
| // removed an the blob links folder should be merged. The first entry is |
| // treated as the "canonical" link location and will be used for writes. |
| linkPathFns []linkPathFunc |
| } |
| |
| var _ distribution.BlobDescriptorService = &linkedBlobStatter{} |
| |
| func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { |
| var ( |
| found bool |
| target digest.Digest |
| ) |
| |
| // try the many link path functions until we get success or an error that |
| // is not PathNotFoundError. |
| for _, linkPathFn := range lbs.linkPathFns { |
| var err error |
| target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn) |
| |
| if err == nil { |
| found = true |
| break // success! |
| } |
| |
| switch err := err.(type) { |
| case driver.PathNotFoundError: |
| // do nothing, just move to the next linkPathFn |
| default: |
| return distribution.Descriptor{}, err |
| } |
| } |
| |
| if !found { |
| return distribution.Descriptor{}, distribution.ErrBlobUnknown |
| } |
| |
| if target != dgst { |
| // Track when we are doing cross-digest domain lookups. ie, sha512 to sha256. |
| dcontext.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target) |
| } |
| |
| // TODO(stevvooe): Look up repository local mediatype and replace that on |
| // the returned descriptor. |
| |
| return lbs.blobStore.statter.Stat(ctx, target) |
| } |
| |
| func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) { |
| // clear any possible existence of a link described in linkPathFns |
| for _, linkPathFn := range lbs.linkPathFns { |
| blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst) |
| if err != nil { |
| return err |
| } |
| |
| err = lbs.blobStore.driver.Delete(ctx, blobLinkPath) |
| if err != nil { |
| switch err := err.(type) { |
| case driver.PathNotFoundError: |
| continue // just ignore this error and continue |
| default: |
| return err |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| // resolveTargetWithFunc allows us to read a link to a resource with different |
| // linkPathFuncs to let us try a few different paths before returning not |
| // found. |
| func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) { |
| blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst) |
| if err != nil { |
| return "", err |
| } |
| |
| return lbs.blobStore.readlink(ctx, blobLinkPath) |
| } |
| |
| func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { |
| // The canonical descriptor for a blob is set at the commit phase of upload |
| return nil |
| } |
| |
| // blobLinkPath provides the path to the blob link, also known as layers. |
| func blobLinkPath(name string, dgst digest.Digest) (string, error) { |
| return pathFor(layerLinkPathSpec{name: name, digest: dgst}) |
| } |
| |
| // manifestRevisionLinkPath provides the path to the manifest revision link. |
| func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) { |
| return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst}) |
| } |