blob: 8adf652f4adcd076b8d306b3b223171346cf1092 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package manager_test
import (
"context"
"sync"
"sync/atomic"
"time"
)
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_manager "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/plugins/resources/memory"
"github.com/apache/dubbo-kubernetes/pkg/test"
. "github.com/apache/dubbo-kubernetes/pkg/test/matchers"
)
type countingResourcesManager struct {
store core_store.ResourceStore
getQueries uint32
listQueries uint32
}
func (c *countingResourcesManager) Get(ctx context.Context, res core_model.Resource, fn ...core_store.GetOptionsFunc) error {
atomic.AddUint32(&c.getQueries, 1)
return c.store.Get(ctx, res, fn...)
}
func (c *countingResourcesManager) List(ctx context.Context, list core_model.ResourceList, fn ...core_store.ListOptionsFunc) error {
opts := core_store.NewListOptions(fn...)
if list.GetItemType() == core_mesh.DataplaneType && opts.Mesh == "slow" {
time.Sleep(10 * time.Second)
}
atomic.AddUint32(&c.listQueries, 1)
return c.store.List(ctx, list, fn...)
}
var _ core_manager.ReadOnlyResourceManager = &countingResourcesManager{}
var _ = Describe("Cached Resource Manager", func() {
var store core_store.ResourceStore
var cachedManager core_manager.ReadOnlyResourceManager
var countingManager *countingResourcesManager
var res *core_mesh.DataplaneResource
expiration := 500 * time.Millisecond
BeforeEach(func() {
// given
var err error
store = memory.NewStore()
countingManager = &countingResourcesManager{
store: store,
}
Expect(err).ToNot(HaveOccurred())
cachedManager, err = core_manager.NewCachedManager(countingManager, expiration)
Expect(err).ToNot(HaveOccurred())
// and created resources
res = &core_mesh.DataplaneResource{
Spec: &mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "127.0.0.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 80,
ServicePort: 8080,
},
},
},
},
}
err = store.Create(context.Background(), res, core_store.CreateByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
})
It("should cache Get() queries", func() {
// when fetched resources multiple times
fetch := func() *core_mesh.DataplaneResource {
fetched := core_mesh.NewDataplaneResource()
err := cachedManager.Get(context.Background(), fetched, core_store.GetByKey("dp-1", "default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch()
wg.Done()
}()
}
wg.Wait()
// then real manager should be called only once
Expect(fetch().Spec).To(MatchProto(res.Spec))
Expect(int(countingManager.getQueries)).To(Equal(1))
// when
time.Sleep(expiration)
// then
Expect(fetch().Spec).To(MatchProto(res.Spec))
Expect(int(countingManager.getQueries)).To(Equal(2))
})
It("should not cache Get() not found", func() {
// when fetched resources multiple times
fetch := func() {
_ = cachedManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("non-existing", "default"))
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch()
wg.Done()
}()
}
wg.Wait()
// then real manager should be called every time
Expect(int(countingManager.getQueries)).To(Equal(100))
})
It("should cache List() queries", func() {
// when fetched resources multiple times
fetch := func() core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
Expect(err).ToNot(HaveOccurred())
return fetched
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch()
wg.Done()
}()
}
wg.Wait()
// then real manager should be called only once
list := fetch()
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(1))
// when
time.Sleep(expiration)
// then
list = fetch()
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(2))
})
It("should let concurrent List() queries for different types and meshes", test.Within(15*time.Second, func() {
// given ongoing TrafficLog from mesh slow that takes a lot of time to complete
done := make(chan struct{})
go func() {
fetched := core_mesh.DataplaneResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("slow"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
// when trying to fetch TrafficLog from different mesh that takes normal time to response
fetched := core_mesh.DataplaneResourceList{}
err := cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
// then first request does not block request for other mesh
Expect(err).ToNot(HaveOccurred())
// when trying to fetch different resource type
fetchedTp := core_mesh.ZoneIngressInsightResourceList{}
err = cachedManager.List(context.Background(), &fetchedTp, core_store.ListByMesh("default"))
// then first request does not block request for other type
Expect(err).ToNot(HaveOccurred())
<-done
}))
It("should cache List() at different key when ordered", test.Within(5*time.Second, func() {
// when fetched resources multiple times
fetch := func(ordered bool) core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
var err error
if ordered {
err = cachedManager.List(context.Background(), &fetched, core_store.ListOrdered(), core_store.ListByMesh("default"))
} else {
err = cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
}
Expect(err).ToNot(HaveOccurred())
return fetched
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch(false)
wg.Done()
}()
}
wg.Wait()
// then real manager should be called only once
list := fetch(false)
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(1))
// when call for ordered data
list = fetch(true)
// then real manager should be called
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(int(countingManager.listQueries)).To(Equal(2))
}))
})