blob: 2695c52192b4067d9b6350f6ebdceff6d6097ca5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"errors"
"fmt"
"io"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/normalpath"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storageutil"
)
// MapReadBucket maps the ReadBucket.
//
// If the Mappers are empty, the original ReadBucket is returned.
// If there is more than one Mapper, the Mappers are called in order
// for UnmapFullPath, with the order reversed for MapPath and MapPrefix.
//
// That is, order these assuming you are starting with a full path and
// working to a path.
func MapReadBucket(readBucket ReadBucket, mappers ...Mapper) ReadBucket {
if len(mappers) == 0 {
return readBucket
}
return newMapReadBucket(readBucket, MapChain(mappers...))
}
// MapWriteBucket maps the WriteBucket.
//
// If the Mappers are empty, the original WriteBucket is returned.
// If there is more than one Mapper, the Mappers are called in order
// for UnmapFullPath, with the order reversed for MapPath and MapPrefix.
//
// That is, order these assuming you are starting with a full path and
// working to a path.
//
// If a path that does not match is called for Put, an error is returned.
func MapWriteBucket(writeBucket WriteBucket, mappers ...Mapper) WriteBucket {
if len(mappers) == 0 {
return writeBucket
}
return newMapWriteBucket(writeBucket, MapChain(mappers...))
}
// MapReadWriteBucket maps the ReadWriteBucket.
//
// If the Mappers are empty, the original ReadWriteBucket is returned.
// If there is more than one Mapper, the Mappers are called in order
// for UnmapFullPath, with the order reversed for MapPath and MapPrefix.
//
// That is, order these assuming you are starting with a full path and
// working to a path.
func MapReadWriteBucket(readWriteBucket ReadWriteBucket, mappers ...Mapper) ReadWriteBucket {
if len(mappers) == 0 {
return readWriteBucket
}
mapper := MapChain(mappers...)
return compositeReadWriteBucket{
newMapReadBucket(readWriteBucket, mapper),
newMapWriteBucket(readWriteBucket, mapper),
}
}
type mapReadBucket struct {
delegate ReadBucket
mapper Mapper
}
func newMapReadBucket(
delegate ReadBucket,
mapper Mapper,
) *mapReadBucket {
return &mapReadBucket{
delegate: delegate,
mapper: mapper,
}
}
func (r *mapReadBucket) Get(ctx context.Context, path string) (ReadObjectCloser, error) {
fullPath, err := r.getFullPath(path)
if err != nil {
return nil, err
}
readObjectCloser, err := r.delegate.Get(ctx, fullPath)
// TODO: if this is a path error, we should replace the path
if err != nil {
return nil, err
}
return replaceReadObjectCloserPath(readObjectCloser, path), nil
}
func (r *mapReadBucket) Stat(ctx context.Context, path string) (ObjectInfo, error) {
fullPath, err := r.getFullPath(path)
if err != nil {
return nil, err
}
objectInfo, err := r.delegate.Stat(ctx, fullPath)
// TODO: if this is a path error, we should replace the path
if err != nil {
return nil, err
}
return replaceObjectInfoPath(objectInfo, path), nil
}
func (r *mapReadBucket) Walk(ctx context.Context, prefix string, f func(ObjectInfo) error) error {
prefix, err := normalpath.NormalizeAndValidate(prefix)
if err != nil {
return err
}
fullPrefix, matches := r.mapper.MapPrefix(prefix)
if !matches {
return nil
}
return r.delegate.Walk(
ctx,
fullPrefix,
func(objectInfo ObjectInfo) error {
path, matches, err := r.mapper.UnmapFullPath(objectInfo.Path())
if err != nil {
return err
}
if !matches {
return nil
}
return f(replaceObjectInfoPath(objectInfo, path))
},
)
}
func (r *mapReadBucket) getFullPath(path string) (string, error) {
path, err := normalpath.NormalizeAndValidate(path)
if err != nil {
return "", err
}
if path == "." {
return "", errors.New("cannot get root")
}
fullPath, matches := r.mapper.MapPath(path)
if !matches {
return "", NewErrNotExist(path)
}
return fullPath, nil
}
type mapWriteBucket struct {
delegate WriteBucket
mapper Mapper
}
func newMapWriteBucket(
delegate WriteBucket,
mapper Mapper,
) *mapWriteBucket {
return &mapWriteBucket{
delegate: delegate,
mapper: mapper,
}
}
func (w *mapWriteBucket) Put(ctx context.Context, path string, opts ...PutOption) (WriteObjectCloser, error) {
fullPath, err := w.getFullPath(path)
if err != nil {
return nil, err
}
writeObjectCloser, err := w.delegate.Put(ctx, fullPath, opts...)
// TODO: if this is a path error, we should replace the path
if err != nil {
return nil, err
}
return replaceWriteObjectCloserExternalPathNotSupported(writeObjectCloser), nil
}
func (w *mapWriteBucket) Delete(ctx context.Context, path string) error {
fullPath, err := w.getFullPath(path)
if err != nil {
return err
}
return w.delegate.Delete(ctx, fullPath)
}
func (w *mapWriteBucket) DeleteAll(ctx context.Context, prefix string) error {
prefix, err := normalpath.NormalizeAndValidate(prefix)
if err != nil {
return err
}
fullPrefix, matches := w.mapper.MapPrefix(prefix)
if !matches {
return nil
}
return w.delegate.DeleteAll(ctx, fullPrefix)
}
func (*mapWriteBucket) SetExternalPathSupported() bool {
return false
}
func (w *mapWriteBucket) getFullPath(path string) (string, error) {
path, err := normalpath.NormalizeAndValidate(path)
if err != nil {
return "", err
}
if path == "." {
return "", errors.New("cannot get root")
}
fullPath, matches := w.mapper.MapPath(path)
if !matches {
return "", fmt.Errorf("path does not match: %s", path)
}
return fullPath, nil
}
func replaceObjectInfoPath(objectInfo ObjectInfo, path string) ObjectInfo {
if objectInfo.Path() == path {
return objectInfo
}
return storageutil.NewObjectInfo(
path,
objectInfo.ExternalPath(),
)
}
func replaceReadObjectCloserPath(readObjectCloser ReadObjectCloser, path string) ReadObjectCloser {
if readObjectCloser.Path() == path {
return readObjectCloser
}
return compositeReadObjectCloser{replaceObjectInfoPath(readObjectCloser, path), readObjectCloser}
}
func replaceWriteObjectCloserExternalPathNotSupported(writeObjectCloser WriteObjectCloser) WriteObjectCloser {
return writeObjectCloserExternalPathNotSuppoted{writeObjectCloser}
}
type writeObjectCloserExternalPathNotSuppoted struct {
io.WriteCloser
}
func (writeObjectCloserExternalPathNotSuppoted) SetExternalPath(string) error {
return ErrSetExternalPathUnsupported
}