blob: 304847ce99e6ba8791ebd6ae10e0d17daf1c79e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backend
import (
"context"
"errors"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/task"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/discovery"
"github.com/apache/servicecomb-service-center/server/plugin/registry"
"time"
)
var store = &KvStore{}
func init() {
store.Initialize()
registerInnerTypes()
}
type KvStore struct {
AddOns map[discovery.Type]AddOn
adaptors util.ConcurrentMap
taskService task.Service
ready chan struct{}
goroutine *gopool.Pool
isClose bool
rev int64
}
func (s *KvStore) Initialize() {
s.AddOns = make(map[discovery.Type]AddOn)
s.taskService = task.NewTaskService()
s.ready = make(chan struct{})
s.goroutine = gopool.New(context.Background())
}
func (s *KvStore) OnCacheEvent(evt discovery.KvEvent) {
if s.rev < evt.Revision {
s.rev = evt.Revision
}
}
func (s *KvStore) InjectConfig(cfg *discovery.Config) *discovery.Config {
return cfg.AppendEventFunc(s.OnCacheEvent)
}
func (s *KvStore) repo() discovery.AdaptorRepository {
return plugin.Plugins().Discovery()
}
func (s *KvStore) getOrCreateAdaptor(t discovery.Type) discovery.Adaptor {
v, _ := s.adaptors.Fetch(t, func() (interface{}, error) {
addOn, ok := s.AddOns[t]
if ok {
adaptor := s.repo().New(t, addOn.(AddOn).Config())
adaptor.Run()
return adaptor, nil
}
log.Warnf("type '%s' not found", t)
return nil, nil
})
return v.(discovery.Adaptor)
}
func (s *KvStore) Run() {
s.goroutine.Do(s.store)
s.goroutine.Do(s.autoClearCache)
s.taskService.Run()
}
func (s *KvStore) store(ctx context.Context) {
// new all types
for _, t := range discovery.Types {
select {
case <-ctx.Done():
return
case <-s.getOrCreateAdaptor(t).Ready():
}
}
util.SafeCloseChan(s.ready)
log.Debugf("all adaptors are ready")
}
func (s *KvStore) autoClearCache(ctx context.Context) {
if core.ServerInfo.Config.CacheTTL == 0 {
return
}
log.Infof("start auto clear cache in %v", core.ServerInfo.Config.CacheTTL)
for {
select {
case <-ctx.Done():
return
case <-time.After(core.ServerInfo.Config.CacheTTL):
for _, t := range discovery.Types {
cache, ok := s.getOrCreateAdaptor(t).Cache().(discovery.Cache)
if !ok {
log.Error("the discovery adaptor does not implement the Cache", nil)
continue
}
cache.MarkDirty()
}
log.Warnf("caches are marked dirty!")
}
}
}
func (s *KvStore) Stop() {
if s.isClose {
return
}
s.isClose = true
s.adaptors.ForEach(func(item util.MapItem) bool {
item.Value.(discovery.Adaptor).Stop()
return true
})
s.taskService.Stop()
s.goroutine.Close(true)
util.SafeCloseChan(s.ready)
log.Debugf("store daemon stopped")
}
func (s *KvStore) Ready() <-chan struct{} {
<-s.taskService.Ready()
return s.ready
}
func (s *KvStore) Install(addOn AddOn) (id discovery.Type, err error) {
if addOn == nil || len(addOn.Name()) == 0 || addOn.Config() == nil {
return discovery.TypeError, errors.New("invalid parameter")
}
id, err = discovery.RegisterType(addOn.Name())
if err != nil {
return
}
discovery.EventProxy(id).InjectConfig(addOn.Config())
s.InjectConfig(addOn.Config())
s.AddOns[id] = addOn
log.Infof("install new type %d:%s->%s", id, addOn.Name(), addOn.Config().Key)
return
}
func (s *KvStore) MustInstall(addOn AddOn) discovery.Type {
id, err := s.Install(addOn)
if err != nil {
panic(err)
}
return id
}
func (s *KvStore) Adaptors(id discovery.Type) discovery.Adaptor { return s.getOrCreateAdaptor(id) }
func (s *KvStore) Service() discovery.Adaptor { return s.Adaptors(SERVICE) }
func (s *KvStore) SchemaSummary() discovery.Adaptor { return s.Adaptors(SchemaSummary) }
func (s *KvStore) Instance() discovery.Adaptor { return s.Adaptors(INSTANCE) }
func (s *KvStore) Lease() discovery.Adaptor { return s.Adaptors(LEASE) }
func (s *KvStore) ServiceIndex() discovery.Adaptor { return s.Adaptors(ServiceIndex) }
func (s *KvStore) ServiceAlias() discovery.Adaptor { return s.Adaptors(ServiceAlias) }
func (s *KvStore) ServiceTag() discovery.Adaptor { return s.Adaptors(ServiceTag) }
func (s *KvStore) Rule() discovery.Adaptor { return s.Adaptors(RULE) }
func (s *KvStore) RuleIndex() discovery.Adaptor { return s.Adaptors(RuleIndex) }
func (s *KvStore) Schema() discovery.Adaptor { return s.Adaptors(SCHEMA) }
func (s *KvStore) DependencyRule() discovery.Adaptor { return s.Adaptors(DependencyRule) }
func (s *KvStore) DependencyQueue() discovery.Adaptor { return s.Adaptors(DependencyQueue) }
func (s *KvStore) Domain() discovery.Adaptor { return s.Adaptors(DOMAIN) }
func (s *KvStore) Project() discovery.Adaptor { return s.Adaptors(PROJECT) }
// KeepAlive will always return ok when registry is unavailable
// unless the registry response is LeaseNotFound
func (s *KvStore) KeepAlive(ctx context.Context, opts ...registry.PluginOpOption) (int64, error) {
op := registry.OpPut(opts...)
t := NewLeaseAsyncTask(op)
if op.Mode == registry.ModeNoCache {
log.Debugf("keep alive lease WitchNoCache, request etcd server, op: %s", op)
err := t.Do(ctx)
ttl := t.TTL
return ttl, err
}
err := s.taskService.Add(ctx, t)
if err != nil {
return 0, err
}
itf, err := s.taskService.LatestHandled(t.Key())
if err != nil {
return 0, err
}
pt := itf.(*LeaseTask)
return pt.TTL, pt.Err()
}
func Store() *KvStore {
return store
}
func Revision() int64 {
return store.rev
}