| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package etcd |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "strings" |
| "time" |
| |
| "github.com/apache/servicecomb-service-center/datasource" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/event" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/sd" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/state" |
| tracer "github.com/apache/servicecomb-service-center/datasource/etcd/tracing" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/server/config" |
| "github.com/go-chassis/cari/dlock" |
| "github.com/go-chassis/foundation/gopool" |
| "github.com/little-cui/etcdadpt" |
| "github.com/little-cui/etcdadpt/middleware/tracing" |
| ) |
| |
| const compactLockKey = "/etcd-compact" |
| |
| var clustersIndex = make(map[string]int) |
| |
| func init() { |
| datasource.Install("etcd", NewDataSource) |
| datasource.Install("embeded_etcd", NewDataSource) //TODO remove misspell in future |
| datasource.Install("embedded_etcd", NewDataSource) |
| |
| sd.RegisterInnerTypes() |
| } |
| |
| type DataSource struct { |
| Options *datasource.Options |
| |
| metadataManager datasource.MetadataManager |
| sysManager datasource.SystemManager |
| depManager datasource.DependencyManager |
| scManager datasource.SCManager |
| metricsManager datasource.MetricsManager |
| syncManager datasource.SyncManager |
| } |
| |
| func (ds *DataSource) SystemManager() datasource.SystemManager { |
| return ds.sysManager |
| } |
| |
| func (ds *DataSource) DependencyManager() datasource.DependencyManager { |
| return ds.depManager |
| } |
| |
| func (ds *DataSource) MetadataManager() datasource.MetadataManager { |
| return ds.metadataManager |
| } |
| |
| func (ds *DataSource) SCManager() datasource.SCManager { |
| return ds.scManager |
| } |
| |
| func (ds *DataSource) MetricsManager() datasource.MetricsManager { |
| return ds.metricsManager |
| } |
| |
| func (ds *DataSource) SyncManager() datasource.SyncManager { |
| return ds.syncManager |
| } |
| |
| func NewDataSource(opts datasource.Options) (datasource.DataSource, error) { |
| log.Warn("data source enable etcd mode") |
| |
| inst := &DataSource{ |
| Options: &opts, |
| } |
| if err := inst.initialize(); err != nil { |
| return nil, err |
| } |
| inst.metadataManager = &MetadataManager{ |
| InstanceTTL: opts.InstanceTTL, |
| InstanceProperties: opts.InstanceProperties, |
| } |
| inst.sysManager = &SysManager{} |
| inst.depManager = &DepManager{} |
| inst.scManager = &SCManager{} |
| inst.metricsManager = &MetricsManager{} |
| inst.syncManager = &SyncManager{} |
| return inst, nil |
| } |
| |
| func (ds *DataSource) initialize() error { |
| // Wait for kv store ready |
| ds.initKvStore() |
| // Compact |
| ds.autoCompact() |
| return nil |
| } |
| |
| func (ds *DataSource) initClustersIndex() { |
| clusterMap, err := etcdadpt.ListCluster(context.Background()) |
| if err != nil { |
| log.Fatal("init clusters index failed", err) |
| } |
| var clusters []string |
| for name := range clusterMap { |
| clusters = append(clusters, name) |
| } |
| sort.Strings(clusters) |
| for i, name := range clusters { |
| clustersIndex[name] = i |
| } |
| } |
| |
| func (ds *DataSource) initPlugins() { |
| // registry |
| etcdCfg := Configuration() |
| etcdCfg.Kind = ds.Options.Kind |
| etcdCfg.Logger = ds.Options.Logger |
| isHTTPS := strings.Contains(strings.ToLower(etcdCfg.ClusterAddresses), "https://") |
| etcdCfg.SslEnabled = ds.Options.SslEnabled && isHTTPS |
| etcdCfg.TLSConfig = ds.Options.TLSConfig |
| etcdCfg.ConnectedFunc = ds.Options.ConnectedFunc |
| etcdCfg.ErrorFunc = ds.Options.ErrorFunc |
| tracing.Register(tracer.New()) |
| err := etcdadpt.Init(etcdCfg) |
| if err != nil { |
| log.Fatal("client init failed", err) |
| } |
| // clusters |
| ds.initClustersIndex() |
| |
| // discovery |
| kind := config.GetString("discovery.kind", "etcd", config.WithStandby("discovery_plugin")) |
| err = state.Init(state.Config{ |
| Kind: kind, |
| ClusterName: etcdCfg.ClusterName, |
| Logger: ds.Options.Logger, |
| EnableCache: ds.Options.EnableCache, |
| }) |
| if err != nil { |
| log.Fatal("sd init failed", err) |
| } |
| } |
| |
| func (ds *DataSource) initKvStore() { |
| // init client/sd plugins |
| ds.initPlugins() |
| // Add events handlers |
| event.Initialize() |
| } |
| |
| func (ds *DataSource) autoCompact() { |
| delta := ds.Options.CompactIndexDelta |
| interval := ds.Options.CompactInterval |
| if delta <= 0 || interval == 0 { |
| return |
| } |
| gopool.Go(func(ctx context.Context) { |
| log.Info(fmt.Sprintf("enabled the automatic compact mechanism, compact once every %s, reserve %d", interval, delta)) |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-time.After(interval): |
| if err := dlock.TryLock(compactLockKey, -1); err != nil { |
| log.Error("can not compact backend by this service center instance now", err) |
| continue |
| } |
| err := etcdadpt.Instance().Compact(ctx, delta) |
| if err != nil { |
| log.Error("", err) |
| } |
| if err := dlock.Unlock(compactLockKey); err != nil { |
| log.Error("unlock failed", err) |
| } |
| } |
| } |
| }) |
| } |