blob: 4d66c5d104c511f341fc0e7a2d287c3f2f368616 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcd
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/event"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state"
tracer "github.com/apache/servicecomb-service-center/datasource/etcd/tracing"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/go-chassis/cari/dlock"
"github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
"github.com/little-cui/etcdadpt/middleware/tracing"
)
const compactLockKey = "/etcd-compact"
var clustersIndex = make(map[string]int)
func init() {
datasource.Install("etcd", NewDataSource)
datasource.Install("embeded_etcd", NewDataSource) //TODO remove misspell in future
datasource.Install("embedded_etcd", NewDataSource)
sd.RegisterInnerTypes()
}
type DataSource struct {
Options *datasource.Options
metadataManager datasource.MetadataManager
sysManager datasource.SystemManager
depManager datasource.DependencyManager
scManager datasource.SCManager
metricsManager datasource.MetricsManager
syncManager datasource.SyncManager
}
func (ds *DataSource) SystemManager() datasource.SystemManager {
return ds.sysManager
}
func (ds *DataSource) DependencyManager() datasource.DependencyManager {
return ds.depManager
}
func (ds *DataSource) MetadataManager() datasource.MetadataManager {
return ds.metadataManager
}
func (ds *DataSource) SCManager() datasource.SCManager {
return ds.scManager
}
func (ds *DataSource) MetricsManager() datasource.MetricsManager {
return ds.metricsManager
}
func (ds *DataSource) SyncManager() datasource.SyncManager {
return ds.syncManager
}
func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
log.Warn("data source enable etcd mode")
inst := &DataSource{
Options: &opts,
}
if err := inst.initialize(); err != nil {
return nil, err
}
inst.metadataManager = &MetadataManager{
InstanceTTL: opts.InstanceTTL,
InstanceProperties: opts.InstanceProperties,
}
inst.sysManager = &SysManager{}
inst.depManager = &DepManager{}
inst.scManager = &SCManager{}
inst.metricsManager = &MetricsManager{}
inst.syncManager = &SyncManager{}
return inst, nil
}
func (ds *DataSource) initialize() error {
// Wait for kv store ready
ds.initKvStore()
// Compact
ds.autoCompact()
return nil
}
func (ds *DataSource) initClustersIndex() {
clusterMap, err := etcdadpt.ListCluster(context.Background())
if err != nil {
log.Fatal("init clusters index failed", err)
}
var clusters []string
for name := range clusterMap {
clusters = append(clusters, name)
}
sort.Strings(clusters)
for i, name := range clusters {
clustersIndex[name] = i
}
}
func (ds *DataSource) initPlugins() {
// registry
etcdCfg := Configuration()
etcdCfg.Kind = ds.Options.Kind
etcdCfg.Logger = ds.Options.Logger
isHTTPS := strings.Contains(strings.ToLower(etcdCfg.ClusterAddresses), "https://")
etcdCfg.SslEnabled = ds.Options.SslEnabled && isHTTPS
etcdCfg.TLSConfig = ds.Options.TLSConfig
etcdCfg.ConnectedFunc = ds.Options.ConnectedFunc
etcdCfg.ErrorFunc = ds.Options.ErrorFunc
tracing.Register(tracer.New())
err := etcdadpt.Init(etcdCfg)
if err != nil {
log.Fatal("client init failed", err)
}
// clusters
ds.initClustersIndex()
// discovery
kind := config.GetString("discovery.kind", "etcd", config.WithStandby("discovery_plugin"))
err = state.Init(state.Config{
Kind: kind,
ClusterName: etcdCfg.ClusterName,
Logger: ds.Options.Logger,
EnableCache: ds.Options.EnableCache,
})
if err != nil {
log.Fatal("sd init failed", err)
}
}
func (ds *DataSource) initKvStore() {
// init client/sd plugins
ds.initPlugins()
// Add events handlers
event.Initialize()
}
func (ds *DataSource) autoCompact() {
delta := ds.Options.CompactIndexDelta
interval := ds.Options.CompactInterval
if delta <= 0 || interval == 0 {
return
}
gopool.Go(func(ctx context.Context) {
log.Info(fmt.Sprintf("enabled the automatic compact mechanism, compact once every %s, reserve %d", interval, delta))
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
if err := dlock.TryLock(compactLockKey, -1); err != nil {
log.Error("can not compact backend by this service center instance now", err)
continue
}
err := etcdadpt.Instance().Compact(ctx, delta)
if err != nil {
log.Error("", err)
}
if err := dlock.Unlock(compactLockKey); err != nil {
log.Error("unlock failed", err)
}
}
}
})
}