| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package services |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "time" |
| |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/core/security" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/core/storage" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/e" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/mapper" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/model" |
| manifest2 "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/manifest" |
| "github.com/google/uuid" |
| "gorm.io/gorm" |
| ) |
| |
| type PushService interface { |
| PushManifestAndBlobs(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet) (*model.Commit, e.ResponseError) |
| PushManifestAndBlobsWithTags(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet, tagNames []string) (*model.Commit, e.ResponseError) |
| PushManifestAndBlobsWithDraft(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet, draftName string) (*model.Commit, e.ResponseError) |
| GetManifestAndBlobSet(ctx context.Context, repositoryID string, reference string) (*manifest2.Manifest, *manifest2.BlobSet, e.ResponseError) |
| } |
| |
| type PushServiceImpl struct { |
| userMapper mapper.UserMapper |
| repositoryMapper mapper.RepositoryMapper |
| fileMapper mapper.FileMapper |
| commitMapper mapper.CommitMapper |
| tagMapper mapper.TagMapper |
| storageHelper storage.StorageHelper |
| } |
| |
| func NewPushService() PushService { |
| return &PushServiceImpl{ |
| userMapper: &mapper.UserMapperImpl{}, |
| repositoryMapper: &mapper.RepositoryMapperImpl{}, |
| commitMapper: &mapper.CommitMapperImpl{}, |
| tagMapper: &mapper.TagMapperImpl{}, |
| fileMapper: &mapper.FileMapperImpl{}, |
| storageHelper: storage.NewStorageHelper(), |
| } |
| } |
| |
| func (pushService *PushServiceImpl) GetManifestAndBlobSet(ctx context.Context, repositoryID string, reference string) (*manifest2.Manifest, *manifest2.BlobSet, e.ResponseError) { |
| // 查询reference对应的commit |
| commit, err := pushService.commitMapper.FindByRepositoryIDAndReference(repositoryID, reference) |
| if err != nil { |
| if errors.Is(err, gorm.ErrRecordNotFound) { |
| return nil, nil, e.NewNotFoundError(fmt.Errorf("repository %s", repositoryID)) |
| } |
| |
| return nil, nil, e.NewInternalError(err) |
| } |
| |
| // 查询文件清单 |
| modelFileManifest, err := pushService.fileMapper.FindCommitManifestByCommitID(commit.CommitID) |
| if err != nil { |
| if err != nil { |
| return nil, nil, e.NewInternalError(err) |
| } |
| } |
| |
| // 接着查询blobs |
| fileBlobs, err := pushService.fileMapper.FindCommitFilesExceptManifestByCommitID(commit.CommitID) |
| if err != nil { |
| return nil, nil, e.NewInternalError(err) |
| } |
| |
| // 读取 |
| fileManifest, blobSet, err := pushService.storageHelper.ReadToManifestAndBlobSet(ctx, modelFileManifest, fileBlobs) |
| if err != nil { |
| return nil, nil, e.NewInternalError(err) |
| } |
| |
| return fileManifest, blobSet, nil |
| } |
| |
| func (pushService *PushServiceImpl) PushManifestAndBlobs(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet) (*model.Commit, e.ResponseError) { |
| commit, err := pushService.toCommit(ctx, userID, ownerName, repositoryName, fileManifest, fileBlobs) |
| if err != nil { |
| return nil, err |
| } |
| |
| // 写入文件 |
| err = pushService.saveFileManifestAndBlobs(ctx, commit) |
| if err != nil { |
| return nil, err |
| } |
| |
| // 写入数据库 |
| createErr := pushService.commitMapper.Create(commit) |
| if createErr != nil { |
| if errors.Is(createErr, gorm.ErrDuplicatedKey) { |
| return nil, e.NewInternalError(createErr) |
| } |
| if errors.Is(createErr, mapper.ErrLastCommitDuplicated) { |
| return nil, e.NewAlreadyExistsError(createErr) |
| } |
| |
| return nil, e.NewInternalError(createErr) |
| } |
| |
| return commit, nil |
| } |
| |
| func (pushService *PushServiceImpl) PushManifestAndBlobsWithTags(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet, tagNames []string) (*model.Commit, e.ResponseError) { |
| commit, err := pushService.toCommit(ctx, userID, ownerName, repositoryName, fileManifest, fileBlobs) |
| if err != nil { |
| return nil, err |
| } |
| |
| // 生成tags |
| var tags []*model.Tag |
| for i := 0; i < len(tagNames); i++ { |
| tags = append(tags, &model.Tag{ |
| UserID: commit.UserID, |
| RepositoryID: commit.RepositoryID, |
| CommitID: commit.CommitID, |
| TagID: uuid.NewString(), |
| TagName: tagNames[i], |
| }) |
| } |
| commit.Tags = tags |
| |
| // 写入文件 |
| err = pushService.saveFileManifestAndBlobs(ctx, commit) |
| if err != nil { |
| return nil, err |
| } |
| |
| // 写入commit |
| createErr := pushService.commitMapper.Create(commit) |
| if createErr != nil { |
| if errors.Is(createErr, mapper.ErrTagAndDraftDuplicated) || errors.Is(createErr, gorm.ErrDuplicatedKey) { |
| return nil, e.NewInternalError(createErr) |
| } |
| if errors.Is(createErr, mapper.ErrLastCommitDuplicated) { |
| return nil, e.NewAlreadyExistsError(createErr) |
| } |
| |
| return nil, e.NewInternalError(createErr) |
| } |
| |
| // 写入tag |
| createErr = pushService.tagMapper.CreateInBatch(tags...) |
| if createErr != nil { |
| if errors.Is(createErr, mapper.ErrTagAndDraftDuplicated) || errors.Is(createErr, gorm.ErrDuplicatedKey) { |
| return nil, e.NewInternalError(createErr) |
| } |
| if errors.Is(createErr, mapper.ErrLastCommitDuplicated) { |
| return nil, e.NewAlreadyExistsError(createErr) |
| } |
| |
| return nil, e.NewInternalError(createErr) |
| } |
| |
| return commit, nil |
| } |
| |
| func (pushService *PushServiceImpl) PushManifestAndBlobsWithDraft(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet, draftName string) (*model.Commit, e.ResponseError) { |
| commit, err := pushService.toCommit(ctx, userID, ownerName, repositoryName, fileManifest, fileBlobs) |
| if err != nil { |
| return nil, err |
| } |
| commit.DraftName = draftName |
| |
| // 写入文件 |
| err = pushService.saveFileManifestAndBlobs(ctx, commit) |
| if err != nil { |
| return nil, err |
| } |
| |
| createErr := pushService.commitMapper.Create(commit) |
| if createErr != nil { |
| if errors.Is(createErr, mapper.ErrTagAndDraftDuplicated) { |
| return nil, e.NewInternalError(createErr) |
| } |
| if errors.Is(createErr, mapper.ErrLastCommitDuplicated) { |
| return nil, e.NewAlreadyExistsError(createErr) |
| } |
| |
| return nil, e.NewInternalError(createErr) |
| } |
| |
| return commit, nil |
| } |
| |
| func (pushService *PushServiceImpl) toCommit(ctx context.Context, userID, ownerName, repositoryName string, fileManifest *manifest2.Manifest, fileBlobs *manifest2.BlobSet) (*model.Commit, e.ResponseError) { |
| // 获取user |
| user, err := pushService.userMapper.FindByUserID(userID) |
| if err != nil || user.UserName != ownerName { |
| return nil, e.NewPermissionDeniedError(err) |
| } |
| |
| // 获取repo |
| repository, err := pushService.repositoryMapper.FindByUserNameAndRepositoryName(ownerName, repositoryName) |
| if err != nil { |
| return nil, e.NewNotFoundError(err) |
| } |
| |
| commitID := uuid.NewString() |
| commitName := security.GenerateCommitName(user.UserName, repositoryName) |
| createTime := time.Now() |
| |
| // 生成file blobs |
| modelBlobs := make([]*model.CommitFile, 0, len(fileManifest.Paths())) |
| err = fileManifest.Range(func(path string, digest manifest2.Digest) error { |
| // 读取文件内容 |
| blob, ok := fileBlobs.BlobFor(digest.String()) |
| if !ok { |
| return e.NewInvalidArgumentError(fmt.Errorf("blob is not valid")) |
| } |
| |
| readCloser, err := blob.Open(ctx) |
| if err != nil { |
| return e.NewInternalError(err) |
| } |
| |
| content, err := io.ReadAll(readCloser) |
| if err != nil { |
| return e.NewInternalError(err) |
| } |
| |
| modelBlobs = append(modelBlobs, &model.CommitFile{ |
| Digest: digest.Hex(), |
| CommitID: commitID, |
| FileName: path, |
| Content: content, |
| UserID: user.UserID, |
| UserName: user.UserName, |
| RepositoryID: repository.RepositoryID, |
| RepositoryName: repository.RepositoryName, |
| CommitName: commitName, |
| CreatedTime: createTime, |
| }) |
| return nil |
| }) |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| |
| // 生成manifest |
| fileManifestBlob, err := fileManifest.Blob() |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| readCloser, err := fileManifestBlob.Open(ctx) |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| content, err := io.ReadAll(readCloser) |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| modelFileManifest := &model.CommitFile{ |
| ID: 0, |
| Digest: fileManifestBlob.Digest().Hex(), |
| CommitID: commitID, |
| Content: content, |
| UserID: user.UserID, |
| UserName: user.UserName, |
| RepositoryID: repository.RepositoryID, |
| RepositoryName: repository.RepositoryName, |
| CommitName: commitName, |
| CreatedTime: createTime, |
| } |
| |
| // 获取bufman config blob |
| configBlob, err := pushService.storageHelper.GetBufManConfigFromBlob(ctx, fileManifest, fileBlobs) |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| // 获取README LICENSE |
| documentBlob, licenseBlob, err := pushService.storageHelper.GetDocumentAndLicenseFromBlob(ctx, fileManifest, fileBlobs) |
| if err != nil { |
| return nil, e.NewInternalError(err) |
| } |
| |
| commit := &model.Commit{ |
| UserID: user.UserID, |
| UserName: user.UserName, |
| RepositoryID: repository.RepositoryID, |
| RepositoryName: repositoryName, |
| CommitID: commitID, |
| CommitName: commitName, |
| CreatedTime: createTime, |
| ManifestDigest: fileManifestBlob.Digest().Hex(), |
| SequenceID: 0, |
| CommitManifest: modelFileManifest, |
| CommitFiles: modelBlobs, |
| } |
| if configBlob != nil { |
| commit.BufManConfigDigest = configBlob.Digest().Hex() |
| } |
| if documentBlob != nil { |
| commit.DocumentDigest = documentBlob.Digest().Hex() |
| } |
| if licenseBlob != nil { |
| commit.LicenseDigest = licenseBlob.Digest().Hex() |
| } |
| |
| return commit, nil |
| } |
| |
| func (pushService *PushServiceImpl) saveFileManifestAndBlobs(ctx context.Context, commit *model.Commit) e.ResponseError { |
| // 保存file blobs |
| for i := 0; i < len(commit.CommitFiles); i++ { |
| fileBlob := commit.CommitFiles[i] |
| |
| // 如果是README文件 |
| if fileBlob.Digest == commit.DocumentDigest { |
| err := pushService.storageHelper.StoreDocumentation(ctx, fileBlob) |
| if err != nil { |
| return e.NewInternalError(err) |
| } |
| |
| } |
| |
| // 普通文件 |
| err := pushService.storageHelper.StoreBlob(ctx, fileBlob) |
| if err != nil { |
| return e.NewInternalError(err) |
| } |
| } |
| |
| // 保存file manifest |
| err := pushService.storageHelper.StoreManifest(ctx, commit.CommitManifest) |
| if err != nil { |
| return e.NewInternalError(err) |
| } |
| |
| return nil |
| } |