blob: c1011a5ed4ac4107b6465fba4ff26a0a22572457 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storageos
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
)
import (
"go.uber.org/atomic"
"go.uber.org/multierr"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/filepathextended"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/normalpath"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storageutil"
)
// errNotDir is the error returned if a path is not a directory.
var errNotDir = errors.New("not a directory")
type bucket struct {
rootPath string
absoluteRootPath string
symlinks bool
}
func newBucket(rootPath string, symlinks bool) (*bucket, error) {
rootPath = normalpath.Unnormalize(rootPath)
if err := validateDirPathExists(rootPath, symlinks); err != nil {
return nil, err
}
absoluteRootPath, err := filepath.Abs(rootPath)
if err != nil {
return nil, err
}
// do not validate - allow anything with OS buckets including
// absolute paths and jumping context
rootPath = normalpath.Normalize(rootPath)
return &bucket{
rootPath: rootPath,
absoluteRootPath: absoluteRootPath,
symlinks: symlinks,
}, nil
}
func (b *bucket) Get(ctx context.Context, path string) (storage.ReadObjectCloser, error) {
externalPath, err := b.getExternalPath(path)
if err != nil {
return nil, err
}
if err := b.validateExternalPath(path, externalPath); err != nil {
return nil, err
}
resolvedPath := externalPath
if b.symlinks {
resolvedPath, err = filepath.EvalSymlinks(externalPath)
if err != nil {
return nil, err
}
}
file, err := os.Open(resolvedPath)
if err != nil {
return nil, err
}
// we could use fileInfo.Name() however we might as well use the externalPath
return newReadObjectCloser(
path,
externalPath,
file,
), nil
}
func (b *bucket) Stat(ctx context.Context, path string) (storage.ObjectInfo, error) {
externalPath, err := b.getExternalPath(path)
if err != nil {
return nil, err
}
if err := b.validateExternalPath(path, externalPath); err != nil {
return nil, err
}
// we could use fileInfo.Name() however we might as well use the externalPath
return storageutil.NewObjectInfo(
path,
externalPath,
), nil
}
func (b *bucket) Walk(
ctx context.Context,
prefix string,
f func(storage.ObjectInfo) error,
) error {
externalPrefix, err := b.getExternalPrefix(prefix)
if err != nil {
return err
}
walkChecker := storageutil.NewWalkChecker()
var walkOptions []filepathextended.WalkOption
if b.symlinks {
walkOptions = append(walkOptions, filepathextended.WalkWithSymlinks())
}
if err := filepathextended.Walk(
externalPrefix,
func(externalPath string, fileInfo os.FileInfo, err error) error {
if err != nil {
// this can happen if a symlink is broken
// in this case, we just want to continue the walk
if b.symlinks && os.IsNotExist(err) {
return nil
}
return err
}
if err := walkChecker.Check(ctx); err != nil {
return err
}
absoluteExternalPath, err := filepath.Abs(externalPath)
if err != nil {
return err
}
if fileInfo.Mode().IsRegular() {
path, err := normalpath.Rel(b.absoluteRootPath, absoluteExternalPath)
if err != nil {
return err
}
// just in case
path, err = normalpath.NormalizeAndValidate(path)
if err != nil {
return err
}
if err := f(
storageutil.NewObjectInfo(
path,
externalPath,
),
); err != nil {
return err
}
}
return nil
},
walkOptions...,
); err != nil {
if os.IsNotExist(err) {
// Should be a no-op according to the spec.
return nil
}
return err
}
return nil
}
func (b *bucket) Put(ctx context.Context, path string, opts ...storage.PutOption) (storage.WriteObjectCloser, error) {
var putOptions storage.PutOptions
for _, opt := range opts {
opt(&putOptions)
}
externalPath, err := b.getExternalPath(path)
if err != nil {
return nil, err
}
externalDir := filepath.Dir(externalPath)
var fileInfo os.FileInfo
if b.symlinks {
fileInfo, err = os.Stat(externalDir)
} else {
fileInfo, err = os.Lstat(externalDir)
}
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(externalDir, 0o755); err != nil {
return nil, err
}
} else {
return nil, err
}
} else if !fileInfo.IsDir() {
return nil, newErrNotDir(externalDir)
}
var file *os.File
var finalPath string
if putOptions.Atomic {
file, err = os.CreateTemp(externalDir, ".tmp"+filepath.Base(externalPath)+"*")
finalPath = externalPath
} else {
file, err = os.Create(externalPath)
}
if err != nil {
return nil, err
}
return newWriteObjectCloser(
file,
finalPath,
), nil
}
func (b *bucket) Delete(ctx context.Context, path string) error {
externalPath, err := b.getExternalPath(path)
if err != nil {
return err
}
// Note: this deletes the file at the path, but it may
// leave orphan parent directories around that were
// created by the MkdirAll in Put.
if err := os.Remove(externalPath); err != nil {
if os.IsNotExist(err) {
return storage.NewErrNotExist(path)
}
return err
}
return nil
}
func (b *bucket) DeleteAll(ctx context.Context, prefix string) error {
externalPrefix, err := b.getExternalPrefix(prefix)
if err != nil {
return err
}
if err := os.RemoveAll(externalPrefix); err != nil {
// this is a no-nop per the documentation
if os.IsNotExist(err) {
return nil
}
return err
}
return nil
}
func (*bucket) SetExternalPathSupported() bool {
return false
}
func (b *bucket) getExternalPath(path string) (string, error) {
path, err := storageutil.ValidatePath(path)
if err != nil {
return "", err
}
realClean, err := filepathextended.RealClean(normalpath.Join(b.rootPath, path))
if err != nil {
return "", err
}
return normalpath.Unnormalize(realClean), nil
}
func (b *bucket) validateExternalPath(path string, externalPath string) error {
// this is potentially introducing two calls to a file
// instead of one, ie we do both Stat and Open as opposed to just Open
// we do this to make sure we are only reading regular files
var fileInfo os.FileInfo
var err error
if b.symlinks {
fileInfo, err = os.Stat(externalPath)
} else {
fileInfo, err = os.Lstat(externalPath)
}
if err != nil {
if os.IsNotExist(err) {
return storage.NewErrNotExist(path)
}
// The path might have a regular file in one of its
// elements (e.g. 'foo/bar/baz.proto' where 'bar' is a
// regular file).
//
// In this case, the standard library will return an
// os.PathError, but there isn't an exported error value
// to check against (i.e. os.Is*). But we can still discover
// whether or not this is the case by checking if any of the
// path components represents a regular file (e.g. 'foo/bar').
//
// It's important that we detect these cases so that
// multi buckets don't unnecessarily fail when one of
// its delegates actually defines the path.
elements := strings.Split(normalpath.Normalize(externalPath), "/")
if len(elements) == 1 {
// The path is a single element, so there aren't
// any other files to check.
return err
}
for i := len(elements) - 1; i >= 0; i-- {
parentFileInfo, err := os.Stat(filepath.Join(elements[:i]...))
if err != nil {
continue
}
if parentFileInfo.Mode().IsRegular() {
// This error primarily serves as a sentinel error,
// but we preserve the original path argument so that
// the error still makes sense to the user.
return storage.NewErrNotExist(path)
}
}
return err
}
if !fileInfo.Mode().IsRegular() {
// making this a user error as any access means this was generally requested
// by the user, since we only call the function for Walk on regular files
return storage.NewErrNotExist(path)
}
return nil
}
func (b *bucket) getExternalPrefix(prefix string) (string, error) {
prefix, err := storageutil.ValidatePrefix(prefix)
if err != nil {
return "", err
}
realClean, err := filepathextended.RealClean(normalpath.Join(b.rootPath, prefix))
if err != nil {
return "", err
}
return normalpath.Unnormalize(realClean), nil
}
type readObjectCloser struct {
// we use ObjectInfo for Path, ExternalPath, etc to make sure this is static
// we put ObjectInfos in maps in other places so we do not want this to change
// this could be a problem if the underlying file is concurrently moved or resized however
storageutil.ObjectInfo
file *os.File
}
func newReadObjectCloser(
path string,
externalPath string,
file *os.File,
) *readObjectCloser {
return &readObjectCloser{
ObjectInfo: storageutil.NewObjectInfo(
path,
externalPath,
),
file: file,
}
}
func (r *readObjectCloser) Read(p []byte) (int, error) {
n, err := r.file.Read(p)
return n, toStorageError(err)
}
func (r *readObjectCloser) Close() error {
return toStorageError(r.file.Close())
}
type writeObjectCloser struct {
file *os.File
// path is set during atomic writes to the final path where the file should be created.
// If set, the file is a temp file that needs to be renamed to this path if Write/Close are successful.
path string
// writeErr contains the first non-nil error caught by a call to Write.
// This is returned in Close for atomic writes to prevent writing an incomplete file.
writeErr atomic.Error
}
func newWriteObjectCloser(
file *os.File,
path string,
) *writeObjectCloser {
return &writeObjectCloser{
file: file,
path: path,
}
}
func (w *writeObjectCloser) Write(p []byte) (int, error) {
n, err := w.file.Write(p)
if err != nil {
w.writeErr.CompareAndSwap(nil, err)
}
return n, toStorageError(err)
}
func (w *writeObjectCloser) SetExternalPath(string) error {
return storage.ErrSetExternalPathUnsupported
}
func (w *writeObjectCloser) Close() error {
err := toStorageError(w.file.Close())
// This is an atomic write operation - we need to rename to the final path
if w.path != "" {
atomicWriteErr := multierr.Append(w.writeErr.Load(), err)
// Failed during Write or Close - remove temporary file without rename
if atomicWriteErr != nil {
return toStorageError(multierr.Append(atomicWriteErr, os.Remove(w.file.Name())))
}
if err := os.Rename(w.file.Name(), w.path); err != nil {
return toStorageError(multierr.Append(err, os.Remove(w.file.Name())))
}
}
return err
}
// newErrNotDir returns a new Error for a path not being a directory.
func newErrNotDir(path string) *normalpath.Error {
return normalpath.NewError(path, errNotDir)
}
func toStorageError(err error) error {
if errors.Is(err, os.ErrClosed) {
return storage.ErrClosed
}
return err
}
// validateDirPathExists returns a non-nil error if the given dirPath
// is not a valid directory path.
func validateDirPathExists(dirPath string, symlinks bool) error {
var fileInfo os.FileInfo
var err error
if symlinks {
fileInfo, err = os.Stat(dirPath)
} else {
fileInfo, err = os.Lstat(dirPath)
}
if err != nil {
if os.IsNotExist(err) {
return storage.NewErrNotExist(dirPath)
}
return err
}
if !fileInfo.IsDir() {
return newErrNotDir(dirPath)
}
return nil
}