| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package etcd |
| |
| import ( |
| "context" |
| "sort" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/apache/servicecomb-service-center/datasource" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/client" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/event" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/job" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/kv" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/mux" |
| "github.com/apache/servicecomb-service-center/datasource/etcd/sd" |
| "github.com/apache/servicecomb-service-center/pkg/etcdsync" |
| "github.com/apache/servicecomb-service-center/pkg/gopool" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/server/config" |
| ) |
| |
| var clustersIndex = make(map[string]int) |
| |
| func init() { |
| datasource.Install("etcd", NewDataSource) |
| datasource.Install("embeded_etcd", NewDataSource) |
| } |
| |
| type DataSource struct { |
| // SchemaEditable determines whether schema modification is allowed for |
| SchemaEditable bool |
| // InstanceTTL options |
| InstanceTTL int64 |
| |
| lockMux sync.Mutex |
| locks map[string]*etcdsync.DLock |
| } |
| |
| func NewDataSource(opts datasource.Options) (datasource.DataSource, error) { |
| // TODO: construct a reasonable DataSource instance |
| log.Warnf("data source enable etcd mode") |
| |
| inst := &DataSource{ |
| SchemaEditable: opts.SchemaEditable, |
| InstanceTTL: opts.InstanceTTL, |
| locks: make(map[string]*etcdsync.DLock), |
| } |
| |
| registryAddresses := strings.Join(Configuration().RegistryAddresses(), ",") |
| Configuration().SslEnabled = opts.SslEnabled && strings.Contains(strings.ToLower(registryAddresses), "https://") |
| |
| if err := inst.initialize(opts); err != nil { |
| return nil, err |
| } |
| return inst, nil |
| } |
| |
| func (ds *DataSource) initialize(opts datasource.Options) error { |
| ds.initClustersIndex() |
| // init client/sd plugins |
| ds.initPlugins(opts) |
| // Add events handlers |
| event.Initialize() |
| // Wait for kv store ready |
| ds.initKvStore() |
| // Compact |
| ds.autoCompact() |
| // Jobs |
| job.ClearNoInstanceServices() |
| return nil |
| } |
| |
| func (ds *DataSource) initClustersIndex() { |
| var clusters []string |
| for name := range Configuration().Clusters { |
| clusters = append(clusters, name) |
| } |
| sort.Strings(clusters) |
| for i, name := range clusters { |
| clustersIndex[name] = i |
| } |
| } |
| |
| func (ds *DataSource) initPlugins(opts datasource.Options) { |
| err := client.Init(opts) |
| if err != nil { |
| log.Fatalf(err, "client init failed") |
| } |
| kind := config.GetString("discovery.kind", "", config.WithStandby("discovery_plugin")) |
| err = sd.Init(sd.Options{Kind: sd.Kind(kind)}) |
| if err != nil { |
| log.Fatalf(err, "sd init failed") |
| } |
| } |
| |
| func (ds *DataSource) initKvStore() { |
| kv.Store().Run() |
| <-kv.Store().Ready() |
| } |
| |
| func (ds *DataSource) autoCompact() { |
| delta := Configuration().CompactIndexDelta |
| interval := Configuration().CompactInterval |
| if delta <= 0 || interval == 0 { |
| return |
| } |
| gopool.Go(func(ctx context.Context) { |
| log.Infof("enabled the automatic compact mechanism, compact once every %s, reserve %d", interval, delta) |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-time.After(interval): |
| lock, err := mux.Try(mux.GlobalLock) |
| if err != nil { |
| log.Errorf(err, "can not compact backend by this service center instance now") |
| continue |
| } |
| |
| err = client.Instance().Compact(ctx, delta) |
| if err != nil { |
| log.Error("", err) |
| } |
| |
| if err := lock.Unlock(); err != nil { |
| log.Error("", err) |
| } |
| } |
| } |
| }) |
| } |