| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package mongo |
| |
| import ( |
| "fmt" |
| |
| "github.com/go-chassis/cari/db" |
| dconfig "github.com/go-chassis/cari/db/config" |
| |
| "github.com/apache/servicecomb-service-center/datasource" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat" |
| "github.com/apache/servicecomb-service-center/datasource/mongo/sd" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/server/config" |
| "github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf" |
| ) |
| |
| const defaultExpireTime = 300 |
| const defaultPoolSize = 1000 |
| |
| func init() { |
| datasource.Install("mongo", NewDataSource) |
| } |
| |
| type DataSource struct { |
| metadataManager datasource.MetadataManager |
| sysManager datasource.SystemManager |
| depManager datasource.DependencyManager |
| scManager datasource.SCManager |
| metricsManager datasource.MetricsManager |
| syncManager datasource.SyncManager |
| } |
| |
| func (ds *DataSource) SystemManager() datasource.SystemManager { |
| return ds.sysManager |
| } |
| |
| func (ds *DataSource) DependencyManager() datasource.DependencyManager { |
| return ds.depManager |
| } |
| |
| func (ds *DataSource) MetadataManager() datasource.MetadataManager { |
| return ds.metadataManager |
| } |
| |
| func (ds *DataSource) SCManager() datasource.SCManager { |
| return ds.scManager |
| } |
| |
| func (ds *DataSource) MetricsManager() datasource.MetricsManager { |
| return ds.metricsManager |
| } |
| |
| func (ds *DataSource) SyncManager() datasource.SyncManager { |
| return ds.syncManager |
| } |
| |
| func NewDataSource(opts datasource.Options) (datasource.DataSource, error) { |
| // TODO: construct a reasonable DataSource instance |
| inst := &DataSource{} |
| // TODO: deal with exception |
| if err := inst.initialize(); err != nil { |
| return nil, err |
| } |
| inst.scManager = &SCManager{} |
| inst.depManager = &DepManager{} |
| inst.sysManager = &SysManager{} |
| inst.metadataManager = &MetadataManager{ |
| InstanceTTL: opts.InstanceTTL, |
| } |
| inst.metricsManager = &MetricsManager{} |
| inst.syncManager = &SyncManager{} |
| return inst, nil |
| } |
| |
| func (ds *DataSource) initialize() error { |
| var err error |
| // init heartbeat plugins |
| err = ds.initPlugins() |
| if err != nil { |
| return err |
| } |
| // init mongo client |
| err = ds.initClient() |
| if err != nil { |
| return err |
| } |
| // create db index and validator |
| ensureDB() |
| |
| // if fast register enabled, init fast register service |
| initFastRegister() |
| |
| // init cache |
| ds.initStore() |
| return nil |
| } |
| |
| func (ds *DataSource) initPlugins() error { |
| kind := config.GetString("heartbeat.kind", "cache") |
| err := heartbeat.Init(heartbeat.Options{PluginImplName: heartbeat.ImplName(kind)}) |
| if err != nil { |
| log.Fatal("heartbeat init failed", err) |
| return err |
| } |
| return nil |
| } |
| |
| func (ds *DataSource) initClient() error { |
| cfg := dconfig.Config{Kind: "mongo"} |
| cfg.URI = config.GetString("registry.mongo.cluster.uri", "mongodb://localhost:27017", |
| config.WithStandby("manager_cluster")) |
| cfg.SSLEnabled = config.GetBool("ssl.enable", false) |
| cfg.Logger = log.Logger |
| if cfg.SSLEnabled { |
| tlsConfig, err := tlsconf.ClientConfig() |
| if err != nil { |
| log.Fatal("get datasource tlsConfig failed", err) |
| return err |
| } |
| cfg.TLSConfig = tlsConfig |
| } |
| poolSize := config.GetInt("registry.mongo.cluster.poolSize", defaultPoolSize) |
| if poolSize <= 0 { |
| log.Warn(fmt.Sprintf("mongo cluster poolSize[%d] is too small, set to default size", poolSize)) |
| poolSize = defaultPoolSize |
| } |
| cfg.PoolSize = poolSize |
| return db.Init(&cfg) |
| } |
| |
| func (ds *DataSource) initStore() { |
| if !config.GetRegistry().EnableCache { |
| log.Debug("cache is disabled") |
| return |
| } |
| sd.Store().Run() |
| <-sd.Store().Ready() |
| } |
| |
| func initFastRegister() { |
| fastRegConfig := FastRegConfiguration() |
| |
| if fastRegConfig.QueueSize > 0 { |
| fastRegisterService := NewFastRegisterInstanceService() |
| SetFastRegisterInstanceService(fastRegisterService) |
| |
| NewRegisterTimeTask().Start() |
| } |
| } |