| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bufmodulecache |
| |
| import ( |
| "bytes" |
| "context" |
| "io" |
| "strings" |
| "testing" |
| "time" |
| ) |
| |
| import ( |
| "github.com/bufbuild/connect-go" |
| |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| |
| "go.uber.org/zap/zaptest" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/bufpkg/bufmodule" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/bufpkg/bufmodule/bufmoduleref" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/gen/proto/connect/registry/v1alpha1/registryv1alpha1connect" |
| registryv1alpha1 "github.com/apache/dubbo-kubernetes/pkg/bufman/gen/proto/go/registry/v1alpha1" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/manifest" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/normalpath" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storageos" |
| "github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/verbose" |
| ) |
| |
| const pingProto = `syntax = "proto3"; |
| |
| package connect.ping.v1; |
| |
| message PingRequest { |
| int64 number = 1; |
| string text = 2; |
| } |
| |
| message PingResponse { |
| int64 number = 1; |
| string text = 2; |
| } |
| |
| service PingService { |
| rpc Ping(PingRequest) returns (PingResponse) {} |
| } |
| ` |
| |
| func TestCASModuleReaderHappyPath(t *testing.T) { |
| t.Parallel() |
| moduleManifest, blobs := createSampleManifestAndBlobs(t) |
| moduleBlob, err := moduleManifest.Blob() |
| require.NoError(t, err) |
| testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) |
| require.NoError(t, err) |
| storageProvider := storageos.NewProvider() |
| storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) |
| require.NoError(t, err) |
| |
| moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { |
| return &testRepositoryServiceClient{} |
| }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) |
| pin, err := bufmoduleref.NewModulePin( |
| "buf.build", |
| "test", |
| "ping", |
| "", |
| "abcd", |
| moduleBlob.Digest().String(), |
| time.Now(), |
| ) |
| require.NoError(t, err) |
| _, err = moduleReader.GetModule(context.Background(), pin) // non-cached |
| require.NoError(t, err) |
| assert.Equal(t, 1, moduleReader.stats.Count()) |
| assert.Equal(t, 0, moduleReader.stats.Hits()) |
| verifyCache(t, storageBucket, pin, moduleManifest, blobs) |
| |
| cachedMod, err := moduleReader.GetModule(context.Background(), pin) |
| require.NoError(t, err) |
| assertModuleIdentity(t, cachedMod, pin.IdentityString(), pin.Commit()) |
| assert.Equal(t, 2, moduleReader.stats.Count()) |
| assert.Equal(t, 1, moduleReader.stats.Hits()) // We should have a cache hit the second time |
| verifyCache(t, storageBucket, pin, moduleManifest, blobs) |
| } |
| |
| func TestCASModuleReaderNoDigest(t *testing.T) { |
| t.Parallel() |
| moduleManifest, blobs := createSampleManifestAndBlobs(t) |
| testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) |
| require.NoError(t, err) |
| storageProvider := storageos.NewProvider() |
| storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) |
| require.NoError(t, err) |
| moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { |
| return &testRepositoryServiceClient{} |
| }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) |
| pin, err := bufmoduleref.NewModulePin( |
| "buf.build", |
| "test", |
| "ping", |
| "", |
| "abcd", |
| "", |
| time.Now(), |
| ) |
| require.NoError(t, err) |
| _, err = moduleReader.GetModule(context.Background(), pin) |
| require.NoError(t, err) |
| assert.Equal(t, 1, moduleReader.stats.Count()) |
| assert.Equal(t, 0, moduleReader.stats.Hits()) |
| verifyCache(t, storageBucket, pin, moduleManifest, blobs) |
| } |
| |
| func TestCASModuleReaderDigestMismatch(t *testing.T) { |
| t.Parallel() |
| moduleManifest, blobs := createSampleManifestAndBlobs(t) |
| testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) |
| require.NoError(t, err) |
| storageProvider := storageos.NewProvider() |
| storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) |
| require.NoError(t, err) |
| moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { |
| return &testRepositoryServiceClient{} |
| }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) |
| pin, err := bufmoduleref.NewModulePin( |
| "buf.build", |
| "test", |
| "ping", |
| "", |
| "abcd", |
| "shake256:"+strings.Repeat("00", 64), // Digest which doesn't match module's digest |
| time.Now(), |
| ) |
| require.NoError(t, err) |
| _, err = moduleReader.GetModule(context.Background(), pin) |
| require.Error(t, err) |
| numFiles := 0 |
| err = storageBucket.Walk(context.Background(), "", func(info storage.ObjectInfo) error { |
| numFiles++ |
| return nil |
| }) |
| require.NoError(t, err) |
| assert.Equal(t, 0, numFiles) // Verify nothing written to cache on digest mismatch |
| } |
| |
| func verifyCache( |
| t *testing.T, |
| bucket storage.ReadWriteBucket, |
| pin bufmoduleref.ModulePin, |
| moduleManifest *manifest.Manifest, |
| blobs *manifest.BlobSet, |
| ) { |
| t.Helper() |
| ctx := context.Background() |
| moduleCacheDir := normalpath.Join(pin.Remote(), pin.Owner(), pin.Repository()) |
| // {remote}/{owner}/{repo}/manifests/{..}/{....} => should return manifest contents |
| moduleBlob, err := moduleManifest.Blob() |
| require.NoError(t, err) |
| verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), moduleBlob) |
| for _, path := range moduleManifest.Paths() { |
| protoDigest, found := moduleManifest.DigestFor(path) |
| require.True(t, found) |
| protoBlob, found := blobs.BlobFor(protoDigest.String()) |
| require.True(t, found) |
| // {remote}/{owner}/{repo}/blobs/{..}/{....} => should return proto blob contents |
| verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), protoBlob) |
| } |
| f, err := bucket.Get(ctx, normalpath.Join(moduleCacheDir, commitsDir, pin.Commit())) |
| require.NoError(t, err) |
| defer f.Close() |
| commitContents, err := io.ReadAll(f) |
| require.NoError(t, err) |
| // {remote}/{owner}/{repo}/commits/{commit} => should return digest string format |
| assert.Equal(t, []byte(moduleBlob.Digest().String()), commitContents) |
| } |
| |
| func createSampleManifestAndBlobs(t *testing.T) (*manifest.Manifest, *manifest.BlobSet) { |
| t.Helper() |
| blob, err := manifest.NewMemoryBlobFromReader(strings.NewReader(pingProto)) |
| require.NoError(t, err) |
| var moduleManifest manifest.Manifest |
| err = moduleManifest.AddEntry("connect/ping/v1/ping.proto", *blob.Digest()) |
| require.NoError(t, err) |
| blobSet, err := manifest.NewBlobSet(context.Background(), []manifest.Blob{blob}) |
| require.NoError(t, err) |
| return &moduleManifest, blobSet |
| } |
| |
| func verifyBlobContents(t *testing.T, bucket storage.ReadWriteBucket, basedir string, blob manifest.Blob) { |
| t.Helper() |
| moduleHexDigest := blob.Digest().Hex() |
| r, err := blob.Open(context.Background()) |
| require.NoError(t, err) |
| var bb bytes.Buffer |
| _, err = io.Copy(&bb, r) |
| require.NoError(t, err) |
| f, err := bucket.Get(context.Background(), normalpath.Join(basedir, moduleHexDigest[:2], moduleHexDigest[2:])) |
| require.NoError(t, err) |
| defer f.Close() |
| cachedModule, err := io.ReadAll(f) |
| require.NoError(t, err) |
| assert.Equal(t, bb.Bytes(), cachedModule) |
| } |
| |
| func assertModuleIdentity(t *testing.T, module bufmodule.Module, expectedModuleIdentity string, expectedCommit string) { |
| require.NotNil(t, module) |
| require.NotEmpty(t, expectedCommit) |
| fileInfos, err := module.SourceFileInfos(context.Background()) |
| require.NoError(t, err) |
| for _, fileInfo := range fileInfos { |
| require.NotNil(t, fileInfo.ModuleIdentity()) |
| assert.Equalf( |
| t, expectedModuleIdentity, fileInfo.ModuleIdentity().IdentityString(), |
| "unexpected module identity for file %q", fileInfo.Path(), |
| ) |
| assert.Equalf( |
| t, expectedCommit, fileInfo.Commit(), |
| "unexpected commit for file %q", fileInfo.Path(), |
| ) |
| } |
| } |
| |
| type testModuleReader struct { |
| module bufmodule.Module |
| } |
| |
| var _ bufmodule.ModuleReader = (*testModuleReader)(nil) |
| |
| func (t *testModuleReader) GetModule(_ context.Context, _ bufmoduleref.ModulePin) (bufmodule.Module, error) { |
| return t.module, nil |
| } |
| |
| type testRepositoryServiceClient struct { |
| registryv1alpha1connect.UnimplementedRepositoryServiceHandler |
| } |
| |
| var _ registryv1alpha1connect.RepositoryServiceClient = (*testRepositoryServiceClient)(nil) |
| |
| func (t *testRepositoryServiceClient) GetRepositoryByFullName( |
| _ context.Context, |
| _ *connect.Request[registryv1alpha1.GetRepositoryByFullNameRequest], |
| ) (*connect.Response[registryv1alpha1.GetRepositoryByFullNameResponse], error) { |
| return connect.NewResponse(®istryv1alpha1.GetRepositoryByFullNameResponse{ |
| Repository: ®istryv1alpha1.Repository{}, |
| }), nil |
| } |
| |
| type testVerbosePrinter struct { |
| t *testing.T |
| } |
| |
| var _ verbose.Printer = (*testVerbosePrinter)(nil) |
| |
| func (t testVerbosePrinter) Printf(format string, args ...interface{}) { |
| t.t.Logf(format, args...) |
| } |