Merge pull request #1226 from robotLJW/master

[feat] using cari db client
diff --git a/eventbase/README.md b/eventbase/README.md
index f373314..2e7127c 100644
--- a/eventbase/README.md
+++ b/eventbase/README.md
@@ -8,7 +8,7 @@
 
 **datasource**: realize the dao operation of etcd and mongo on task and tombstone.
 
-**domain**: task and tombstone request.
+**model**: task and tombstone request.
 
 **service**: Interfaces exposed by task and tombstone.
 
@@ -16,21 +16,32 @@
 
 ### how to use
 
+1.First you should import the eventbase's bootstrap
 ```go
 import (
-	_ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
-	"github.com/apache/servicecomb-service-center/eventbase/datasource"
-	tasksvc "github.com/apache/servicecomb-service-center/eventbase/service/task"
-	tombstonesvc "github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+    _ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
+)
+```
+2.Second you should do Init func
+
+```go
+
+import (
+    "github.com/go-chassis/cari/db/config"
+    
+    _ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
+    "github.com/apache/servicecomb-service-center/eventbase/datasource"
+    "github.com/apache/servicecomb-service-center/eventbase/service/task"
+    "github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
 )
 
-func Init(){
-	dbCfg := db.Config{
+func main(){
+	cfg := config.Config{
 		Kind: "etcd",
 		URI: "http://127.0.0.1:2379",
 		Timeout: 10 * time.Second,
 	}
-	err := datasource.Init(dbCfg)
+	err := datasource.Init(&cfg)
 	...
 	tasksvc.List(...)
 	tombstonesvc.List(...)
diff --git a/eventbase/bootstrap/bootstrap.go b/eventbase/bootstrap/bootstrap.go
index 5613a80..c9dc65f 100644
--- a/eventbase/bootstrap/bootstrap.go
+++ b/eventbase/bootstrap/bootstrap.go
@@ -18,9 +18,7 @@
 package bootstrap
 
 import (
-	// support embedded etcd
-	_ "github.com/little-cui/etcdadpt/embedded"
-	_ "github.com/little-cui/etcdadpt/remote"
+	_ "github.com/go-chassis/cari/db/bootstrap"
 
 	_ "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd"
 	_ "github.com/apache/servicecomb-service-center/eventbase/datasource/mongo"
diff --git a/eventbase/datasource/etcd/etcd.go b/eventbase/datasource/etcd/etcd.go
index a5389cb..cd03c9e 100644
--- a/eventbase/datasource/etcd/etcd.go
+++ b/eventbase/datasource/etcd/etcd.go
@@ -18,22 +18,14 @@
 package etcd
 
 import (
-	"crypto/tls"
-	"fmt"
-
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/openlog"
-	"github.com/little-cui/etcdadpt"
-
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/task"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/tombstone"
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/tlsutil"
 )
 
 type Datasource struct {
-	taskDao   datasource.TaskDao
-	tombstone datasource.TombstoneDao
+	taskDao      datasource.TaskDao
+	tombstoneDao datasource.TombstoneDao
 }
 
 func (d *Datasource) TaskDao() datasource.TaskDao {
@@ -41,28 +33,11 @@
 }
 
 func (d *Datasource) TombstoneDao() datasource.TombstoneDao {
-	return d.tombstone
+	return d.tombstoneDao
 }
 
-func NewDatasource(c *db.Config) (datasource.DataSource, error) {
-	openlog.Info(fmt.Sprintf("use %s as storage", c.Kind))
-	var tlsConfig *tls.Config
-	if c.SSLEnabled {
-		var err error
-		tlsConfig, err = tlsutil.Config(c)
-		if err != nil {
-			return nil, err
-		}
-	}
-	inst := &Datasource{}
-	inst.taskDao = &task.Dao{}
-	inst.tombstone = &tombstone.Dao{}
-	return inst, etcdadpt.Init(etcdadpt.Config{
-		Kind:             c.Kind,
-		ClusterAddresses: c.URI,
-		SslEnabled:       c.SSLEnabled,
-		TLSConfig:        tlsConfig,
-	})
+func NewDatasource() datasource.DataSource {
+	return &Datasource{taskDao: &task.Dao{}, tombstoneDao: &tombstone.Dao{}}
 }
 
 func init() {
diff --git a/eventbase/datasource/etcd/etcd_test.go b/eventbase/datasource/etcd/etcd_test.go
index d7c806a..834a672 100644
--- a/eventbase/datasource/etcd/etcd_test.go
+++ b/eventbase/datasource/etcd/etcd_test.go
@@ -19,27 +19,15 @@
 
 import (
 	"testing"
-	"time"
 
-	"github.com/go-chassis/cari/db"
 	"github.com/stretchr/testify/assert"
-	// support embedded etcd
-	_ "github.com/little-cui/etcdadpt/embedded"
-	_ "github.com/little-cui/etcdadpt/remote"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd"
-	"github.com/apache/servicecomb-service-center/eventbase/test"
 )
 
 func TestNewDatasource(t *testing.T) {
-	t.Run("create etcd datasource should pass with no error", func(t *testing.T) {
-		cfg := &db.Config{
-			Kind:    test.Etcd,
-			URI:     test.EtcdURI,
-			Timeout: 10 * time.Second,
-		}
-		etcdDatasource, err := etcd.NewDatasource(cfg)
-		assert.NoError(t, err)
+	t.Run("create a datasource should pass", func(t *testing.T) {
+		etcdDatasource := etcd.NewDatasource()
 		assert.NotNil(t, etcdDatasource)
 		assert.NotNil(t, etcdDatasource.TaskDao())
 		assert.NotNil(t, etcdDatasource.TombstoneDao())
diff --git a/eventbase/datasource/etcd/task/task_dao.go b/eventbase/datasource/etcd/task/task_dao.go
index c997752..71bc707 100644
--- a/eventbase/datasource/etcd/task/task_dao.go
+++ b/eventbase/datasource/etcd/task/task_dao.go
@@ -22,7 +22,6 @@
 	"encoding/json"
 
 	"github.com/go-chassis/cari/sync"
-	"github.com/go-chassis/openlog"
 	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
@@ -35,16 +34,13 @@
 func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
 	taskBytes, err := json.Marshal(task)
 	if err != nil {
-		openlog.Error("fail to marshal task")
 		return nil, err
 	}
 	ok, err := etcdadpt.InsertBytes(ctx, key.TaskKey(task.Domain, task.Project, task.ID, task.Timestamp), taskBytes)
 	if err != nil {
-		openlog.Error("fail to create task" + err.Error())
 		return nil, err
 	}
 	if !ok {
-		openlog.Error("create error" + datasource.ErrTaskAlreadyExists.Error())
 		return nil, datasource.ErrTaskAlreadyExists
 	}
 	return task, nil
@@ -54,7 +50,6 @@
 	keyTask := key.TaskKey(task.Domain, task.Project, task.ID, task.Timestamp)
 	resp, err := etcdadpt.Get(ctx, keyTask)
 	if err != nil {
-		openlog.Error("fail to get task" + err.Error())
 		return err
 	}
 	if resp == nil {
@@ -63,14 +58,12 @@
 	var dbTask sync.Task
 	err = json.Unmarshal(resp.Value, &dbTask)
 	if err != nil {
-		openlog.Error("fail to unmarshal" + err.Error())
 		return err
 	}
 	dbTask.Status = task.Status
 
 	taskBytes, err := json.Marshal(dbTask)
 	if err != nil {
-		openlog.Error("fail to marshal" + err.Error())
 		return err
 	}
 	return etcdadpt.PutBytes(ctx, keyTask, taskBytes)
@@ -83,7 +76,6 @@
 	}
 	err := etcdadpt.Txn(ctx, delOptions)
 	if err != nil {
-		openlog.Error("fail to delete task" + err.Error())
 		return err
 	}
 	return nil
@@ -97,14 +89,12 @@
 	tasks := make([]*sync.Task, 0)
 	kvs, _, err := etcdadpt.List(ctx, key.TaskList(opts.Domain, opts.Project))
 	if err != nil {
-		openlog.Error("fail to list task" + err.Error())
 		return tasks, err
 	}
 	for _, kv := range kvs {
 		task := sync.Task{}
 		err := json.Unmarshal(kv.Value, &task)
 		if err != nil {
-			openlog.Error("fail to unmarshal task" + err.Error())
 			continue
 		}
 		if !filterMatch(&task, opts) {
diff --git a/eventbase/datasource/etcd/tombstone/tombstone_dao.go b/eventbase/datasource/etcd/tombstone/tombstone_dao.go
index d42a76f..3cf12d6 100644
--- a/eventbase/datasource/etcd/tombstone/tombstone_dao.go
+++ b/eventbase/datasource/etcd/tombstone/tombstone_dao.go
@@ -22,7 +22,6 @@
 	"encoding/json"
 
 	"github.com/go-chassis/cari/sync"
-	"github.com/go-chassis/openlog"
 	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
@@ -37,7 +36,6 @@
 	tombstoneKey := key.TombstoneKey(req.Domain, req.Project, req.ResourceType, req.ResourceID)
 	kv, err := etcdadpt.Get(ctx, tombstoneKey)
 	if err != nil {
-		openlog.Error("fail to get tombstone" + err.Error())
 		return nil, err
 	}
 	if kv == nil {
@@ -46,7 +44,6 @@
 	tombstone := sync.Tombstone{}
 	err = json.Unmarshal(kv.Value, &tombstone)
 	if err != nil {
-		openlog.Error("fail to unmarshal tombstone" + err.Error())
 		return nil, err
 	}
 	return &tombstone, nil
@@ -55,17 +52,14 @@
 func (d *Dao) Create(ctx context.Context, tombstone *sync.Tombstone) (*sync.Tombstone, error) {
 	tombstoneBytes, err := json.Marshal(tombstone)
 	if err != nil {
-		openlog.Error("fail to marshal tombstone")
 		return nil, err
 	}
 	ok, err := etcdadpt.InsertBytes(ctx, key.TombstoneKey(tombstone.Domain, tombstone.Project,
 		tombstone.ResourceType, tombstone.ResourceID), tombstoneBytes)
 	if err != nil {
-		openlog.Error("fail to create tombstone" + err.Error())
 		return nil, err
 	}
 	if !ok {
-		openlog.Error("create error" + datasource.ErrTombstoneAlreadyExists.Error())
 		return nil, datasource.ErrTombstoneAlreadyExists
 	}
 	return tombstone, nil
@@ -74,11 +68,11 @@
 func (d *Dao) Delete(ctx context.Context, tombstones ...*sync.Tombstone) error {
 	delOptions := make([]etcdadpt.OpOptions, len(tombstones))
 	for i, tombstone := range tombstones {
-		delOptions[i] = etcdadpt.OpDel(etcdadpt.WithStrKey(key.TombstoneKey(tombstone.Domain, tombstone.Project, tombstone.ResourceType, tombstone.ResourceID)))
+		delOptions[i] = etcdadpt.OpDel(etcdadpt.WithStrKey(key.TombstoneKey(tombstone.Domain, tombstone.Project,
+			tombstone.ResourceType, tombstone.ResourceID)))
 	}
 	err := etcdadpt.Txn(ctx, delOptions)
 	if err != nil {
-		openlog.Error("fail to delete tombstone" + err.Error())
 		return err
 	}
 	return nil
@@ -92,14 +86,12 @@
 	tombstones := make([]*sync.Tombstone, 0)
 	kvs, _, err := etcdadpt.List(ctx, key.TombstoneList(opts.Domain, opts.Project))
 	if err != nil {
-		openlog.Error("fail to list tombstone" + err.Error())
 		return tombstones, err
 	}
 	for _, kv := range kvs {
 		tombstone := sync.Tombstone{}
 		err = json.Unmarshal(kv.Value, &tombstone)
 		if err != nil {
-			openlog.Error("fail to unmarshal tombstone" + err.Error())
 			continue
 		}
 		if !filterMatch(&tombstone, opts) {
diff --git a/eventbase/datasource/manager.go b/eventbase/datasource/manager.go
index 06afb9b..d6f4aae 100644
--- a/eventbase/datasource/manager.go
+++ b/eventbase/datasource/manager.go
@@ -22,7 +22,7 @@
 	"time"
 
 	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/openlog"
+	"github.com/go-chassis/cari/db/config"
 )
 
 const (
@@ -35,7 +35,7 @@
 	plugins        = make(map[string]dataSourceEngine)
 )
 
-type dataSourceEngine func(c *db.Config) (DataSource, error)
+type dataSourceEngine func() DataSource
 
 func GetDataSource() DataSource {
 	return dataSourceInst
@@ -45,34 +45,22 @@
 	plugins[name] = engineFunc
 }
 
-func Init(c db.Config) error {
-	var err error
+func Init(c *config.Config) error {
 	if c.Kind == "" {
 		c.Kind = DefaultDBKind
 	}
+	if c.Timeout == 0 {
+		c.Timeout = DefaultTimeout
+	}
+	err := db.Init(c)
+	if err != nil {
+		return err
+	}
 	f, ok := plugins[c.Kind]
 	if !ok {
 		return fmt.Errorf("do not support %s", c.Kind)
 	}
-	if c.Timeout == 0 {
-		c.Timeout = DefaultTimeout
-	}
-	dbc := &db.Config{
-		Kind:        c.Kind,
-		URI:         c.URI,
-		PoolSize:    c.PoolSize,
-		SSLEnabled:  c.SSLEnabled,
-		RootCA:      c.RootCA,
-		CertFile:    c.CertFile,
-		CertPwdFile: c.CertPwdFile,
-		KeyFile:     c.KeyFile,
-		Timeout:     c.Timeout,
-	}
-
-	if dataSourceInst, err = f(dbc); err != nil {
-		return err
-	}
-	openlog.Info(fmt.Sprintf("use %s as storage", c.Kind))
+	dataSourceInst = f()
 	return nil
 }
 
diff --git a/eventbase/datasource/manager_test.go b/eventbase/datasource/manager_test.go
new file mode 100644
index 0000000..c6bc54a
--- /dev/null
+++ b/eventbase/datasource/manager_test.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package datasource_test
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/servicecomb-service-center/eventbase/datasource"
+	"github.com/apache/servicecomb-service-center/eventbase/test"
+)
+
+func TestInit(t *testing.T) {
+	t.Run("init config should pass with no error", func(t *testing.T) {
+		err := datasource.Init(&test.DbCfg)
+		assert.Nil(t, err)
+		assert.NotNil(t, datasource.GetDataSource())
+	})
+}
diff --git a/eventbase/datasource/mongo/client/client.go b/eventbase/datasource/mongo/client/client.go
deleted file mode 100644
index 0998a9c..0000000
--- a/eventbase/datasource/mongo/client/client.go
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package client
-
-import (
-	"context"
-	"crypto/tls"
-	"crypto/x509"
-	"errors"
-	"fmt"
-	"io/ioutil"
-	"time"
-
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/foundation/gopool"
-	"github.com/go-chassis/openlog"
-	"go.mongodb.org/mongo-driver/mongo"
-	"go.mongodb.org/mongo-driver/mongo/options"
-
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/model"
-)
-
-const (
-	MongoCheckDelay     = 2 * time.Second
-	HeathChekRetryTimes = 3
-)
-
-var (
-	ErrOpenDbFailed  = errors.New("open db failed")
-	ErrRootCAMissing = errors.New("rootCAFile is empty in config file")
-)
-
-var client *MongoClient
-
-type MongoClient struct {
-	client *mongo.Client
-	db     *mongo.Database
-	config *db.Config
-
-	err       chan error
-	ready     chan struct{}
-	goroutine *gopool.Pool
-}
-
-func NewMongoClient(config *db.Config) {
-	inst := &MongoClient{}
-	if err := inst.Initialize(config); err != nil {
-		openlog.Error("failed to init mongodb" + err.Error())
-		inst.err <- err
-	}
-	client = inst
-}
-
-func (mc *MongoClient) Err() <-chan error {
-	return mc.err
-}
-
-func (mc *MongoClient) Ready() <-chan struct{} {
-	return mc.ready
-}
-
-func (mc *MongoClient) Close() {
-	if mc.client != nil {
-		if err := mc.client.Disconnect(context.TODO()); err != nil {
-			openlog.Error("[close mongo client] failed disconnect the mongo client" + err.Error())
-		}
-	}
-}
-
-func (mc *MongoClient) Initialize(config *db.Config) (err error) {
-	mc.err = make(chan error, 1)
-	mc.ready = make(chan struct{})
-	mc.goroutine = gopool.New()
-	mc.config = config
-	err = mc.newClient(context.Background())
-	if err != nil {
-		return
-	}
-	mc.startHealthCheck()
-	close(mc.ready)
-	return nil
-}
-
-func (mc *MongoClient) newClient(ctx context.Context) (err error) {
-	clientOptions := []*options.ClientOptions{options.Client().ApplyURI(mc.config.URI)}
-	clientOptions = append(clientOptions, options.Client().SetMaxPoolSize(uint64(mc.config.PoolSize)))
-	if mc.config.SSLEnabled {
-		if mc.config.RootCA == "" {
-			err = ErrRootCAMissing
-			return
-		}
-		pool := x509.NewCertPool()
-		caCert, err := ioutil.ReadFile(mc.config.RootCA)
-		if err != nil {
-			err = fmt.Errorf("read ca cert file %s failed", mc.config.RootCA)
-			openlog.Error("ca cert :" + err.Error())
-			return err
-		}
-		pool.AppendCertsFromPEM(caCert)
-		clientCerts := make([]tls.Certificate, 0)
-		if mc.config.CertFile != "" && mc.config.KeyFile != "" {
-			cert, err := tls.LoadX509KeyPair(mc.config.CertFile, mc.config.KeyFile)
-			if err != nil {
-				openlog.Error("load X509 keyPair failed: " + err.Error())
-				return err
-			}
-			clientCerts = append(clientCerts, cert)
-		}
-		tc := &tls.Config{
-			RootCAs:            pool,
-			InsecureSkipVerify: !mc.config.VerifyPeer,
-			Certificates:       clientCerts,
-		}
-		clientOptions = append(clientOptions, options.Client().SetTLSConfig(tc))
-		openlog.Info("enabled ssl communication to mongodb")
-	}
-	mc.client, err = mongo.Connect(ctx, clientOptions...)
-	if err != nil {
-		openlog.Error("failed to connect to mongo" + err.Error())
-		if derr := mc.client.Disconnect(ctx); derr != nil {
-			openlog.Error("[init mongo client] failed to disconnect mongo clients" + err.Error())
-		}
-		return
-	}
-	mc.db = mc.client.Database(model.DBName)
-	if mc.db == nil {
-		return ErrOpenDbFailed
-	}
-	return nil
-}
-
-func (mc *MongoClient) startHealthCheck() {
-	mc.goroutine.Do(mc.HealthCheck)
-}
-
-func (mc *MongoClient) HealthCheck(ctx context.Context) {
-	for {
-		select {
-		case <-ctx.Done():
-			mc.Close()
-			return
-		case <-time.After(MongoCheckDelay):
-			for i := 0; i < HeathChekRetryTimes; i++ {
-				err := mc.client.Ping(context.Background(), nil)
-				if err == nil {
-					break
-				}
-				openlog.Error(fmt.Sprintf("retry to connect to mongodb %s after %s", mc.config.URI, MongoCheckDelay) + err.Error())
-				select {
-				case <-ctx.Done():
-					mc.Close()
-					return
-				case <-time.After(MongoCheckDelay):
-				}
-			}
-		}
-	}
-}
-
-func GetMongoClient() *MongoClient {
-	return client
-}
-
-// ExecTxn execute a transaction command
-// want to abort transaction, return error in cmd fn impl, otherwise it will commit transaction
-func (mc *MongoClient) ExecTxn(ctx context.Context, cmd func(sessionContext mongo.SessionContext) error) error {
-	session, err := mc.client.StartSession()
-	if err != nil {
-		return err
-	}
-	if err = session.StartTransaction(); err != nil {
-		return err
-	}
-	defer session.EndSession(ctx)
-	if err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
-		if err = cmd(sc); err != nil {
-			if err = session.AbortTransaction(sc); err != nil {
-				return err
-			}
-		} else {
-			if err = session.CommitTransaction(sc); err != nil {
-				return err
-			}
-		}
-		return nil
-	}); err != nil {
-		return err
-	}
-	return nil
-}
-
-func (mc *MongoClient) GetDB() *mongo.Database {
-	return mc.db
-}
-
-func (mc *MongoClient) CreateIndexes(ctx context.Context, Table string, indexes []mongo.IndexModel) error {
-	_, err := mc.db.Collection(Table).Indexes().CreateMany(ctx, indexes)
-	if err != nil {
-		return err
-	}
-	return nil
-}
diff --git a/eventbase/datasource/mongo/model/types.go b/eventbase/datasource/mongo/model/types.go
index c53efb1..7e09c99 100644
--- a/eventbase/datasource/mongo/model/types.go
+++ b/eventbase/datasource/mongo/model/types.go
@@ -18,8 +18,6 @@
 package model
 
 const (
-	DBName = "servicecomb"
-
 	CollectionTask      = "task"
 	CollectionTombstone = "tombstone"
 	ColumnDomain        = "domain"
diff --git a/eventbase/datasource/mongo/mongo.go b/eventbase/datasource/mongo/mongo.go
index 72f6099..7937b07 100644
--- a/eventbase/datasource/mongo/mongo.go
+++ b/eventbase/datasource/mongo/mongo.go
@@ -18,23 +18,24 @@
 package mongo
 
 import (
-	"strings"
-
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/openlog"
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"go.mongodb.org/mongo-driver/bson"
-	"gopkg.in/mgo.v2"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/x/bsonx"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/client"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/model"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/task"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/tombstone"
 )
 
+func init() {
+	datasource.RegisterPlugin("mongo", NewDatasource)
+}
+
 type Datasource struct {
-	taskDao   datasource.TaskDao
-	tombstone datasource.TombstoneDao
+	taskDao      datasource.TaskDao
+	tombstoneDao datasource.TombstoneDao
 }
 
 func (d *Datasource) TaskDao() datasource.TaskDao {
@@ -42,102 +43,50 @@
 }
 
 func (d *Datasource) TombstoneDao() datasource.TombstoneDao {
-	return d.tombstone
+	return d.tombstoneDao
 }
 
-func NewDatasource(config *db.Config) (datasource.DataSource, error) {
-	inst := &Datasource{}
-	inst.taskDao = &task.Dao{}
-	inst.tombstone = &tombstone.Dao{}
-	return inst, inst.initialize(config)
+func NewDatasource() datasource.DataSource {
+	ensureDB()
+	return &Datasource{taskDao: &task.Dao{}, tombstoneDao: &tombstone.Dao{}}
 }
 
-func (d *Datasource) initialize(config *db.Config) error {
-	err := d.initClient(config)
-	if err != nil {
-		return err
+func ensureDB() {
+	ensureTask()
+	ensureTombstone()
+}
+
+func ensureTask() {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{model.ColumnID, model.ColumnDomain, model.ColumnProject, model.ColumnTimestamp},
 	}
-	ensureDB(config)
-	return nil
-}
-
-func (d *Datasource) initClient(config *db.Config) error {
-	client.NewMongoClient(config)
-	select {
-	case err := <-client.GetMongoClient().Err():
-		return err
-	case <-client.GetMongoClient().Ready():
-		return nil
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
 	}
+	dmongo.EnsureCollection(model.CollectionTask, validator, []mongo.IndexModel{buildIndexDoc(
+		model.ColumnDomain, model.ColumnProject, model.ColumnID, model.ColumnTimestamp)})
 }
 
-func init() {
-	datasource.RegisterPlugin("mongo", NewDatasource)
-}
-
-func ensureDB(config *db.Config) {
-	session := openSession(config)
-	defer session.Close()
-	session.SetMode(mgo.Primary, true)
-
-	ensureTask(session)
-	ensureTombstone(session)
-}
-
-func openSession(c *db.Config) *mgo.Session {
-	timeout := c.Timeout
-	var err error
-	session, err := mgo.DialWithTimeout(c.URI, timeout)
-	if err != nil {
-		openlog.Warn("can not dial db, retry once:" + err.Error())
-		session, err = mgo.DialWithTimeout(c.URI, timeout)
-		if err != nil {
-			openlog.Fatal("can not dial db:" + err.Error())
-		}
+func ensureTombstone() {
+	jsonSchema := bson.M{
+		"bsonType": "object",
+		"required": []string{model.ColumnResourceID, model.ColumnDomain, model.ColumnProject, model.ColumnResourceType},
 	}
-	return session
-}
-
-func wrapError(err error, skipMsg ...string) {
-	if err != nil {
-		for _, str := range skipMsg {
-			if strings.Contains(err.Error(), str) {
-				openlog.Debug(err.Error())
-				return
-			}
-		}
-		openlog.Error(err.Error())
+	validator := bson.M{
+		"$jsonSchema": jsonSchema,
 	}
+	dmongo.EnsureCollection(model.CollectionTombstone, validator, []mongo.IndexModel{buildIndexDoc(
+		model.ColumnDomain, model.ColumnProject, model.ColumnResourceID, model.ColumnResourceType)})
 }
 
-func ensureTask(session *mgo.Session) {
-	c := session.DB(model.DBName).C(model.CollectionTask)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		model.ColumnID:        bson.M{"$exists": true},
-		model.ColumnDomain:    bson.M{"$exists": true},
-		model.ColumnProject:   bson.M{"$exists": true},
-		model.ColumnTimestamp: bson.M{"$exists": true},
-	}})
-	wrapError(err)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{model.ColumnDomain, model.ColumnProject, model.ColumnID, model.ColumnTimestamp},
-		Unique: true,
-	})
-	wrapError(err)
-}
-
-func ensureTombstone(session *mgo.Session) {
-	c := session.DB(model.DBName).C(model.CollectionTombstone)
-	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
-		model.ColumnResourceID:   bson.M{"$exists": true},
-		model.ColumnDomain:       bson.M{"$exists": true},
-		model.ColumnProject:      bson.M{"$exists": true},
-		model.ColumnResourceType: bson.M{"$exists": true},
-	}})
-	wrapError(err)
-	err = c.EnsureIndex(mgo.Index{
-		Key:    []string{model.ColumnDomain, model.ColumnProject, model.ColumnResourceID, model.ColumnResourceType},
-		Unique: true,
-	})
-	wrapError(err)
+func buildIndexDoc(keys ...string) mongo.IndexModel {
+	keysDoc := bsonx.Doc{}
+	for _, key := range keys {
+		keysDoc = keysDoc.Append(key, bsonx.Int32(1))
+	}
+	index := mongo.IndexModel{
+		Keys: keysDoc,
+	}
+	return index
 }
diff --git a/eventbase/datasource/mongo/task/task_dao.go b/eventbase/datasource/mongo/task/task_dao.go
index a012871..8d6c180 100644
--- a/eventbase/datasource/mongo/task/task_dao.go
+++ b/eventbase/datasource/mongo/task/task_dao.go
@@ -20,14 +20,13 @@
 import (
 	"context"
 
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/cari/sync"
-	"github.com/go-chassis/openlog"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
 	mopts "go.mongodb.org/mongo-driver/mongo/options"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/client"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/model"
 )
 
@@ -35,28 +34,26 @@
 }
 
 func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTask)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTask)
 	_, err := collection.InsertOne(ctx, task)
 	if err != nil {
-		openlog.Error("fail to create task" + err.Error())
 		return nil, err
 	}
 	return task, nil
 }
 
 func (d *Dao) Update(ctx context.Context, task *sync.Task) error {
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTask)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTask)
 	result, err := collection.UpdateOne(ctx,
-		bson.M{model.ColumnID: task.ID, model.ColumnDomain: task.Domain, model.ColumnProject: task.Project, model.ColumnTimestamp: task.Timestamp},
+		bson.M{model.ColumnID: task.ID, model.ColumnDomain: task.Domain,
+			model.ColumnProject: task.Project, model.ColumnTimestamp: task.Timestamp},
 		bson.D{{Key: "$set", Value: bson.D{
 			{Key: model.ColumnStatus, Value: task.Status}}},
 		})
 	if err != nil {
-		openlog.Error("fail to update task" + err.Error())
 		return err
 	}
 	if result.ModifiedCount == 0 {
-		openlog.Error("fail to update task" + datasource.ErrTaskNotExists.Error())
 		return datasource.ErrTaskNotExists
 	}
 	return nil
@@ -77,22 +74,18 @@
 	}
 
 	var deleteFunc = func(sessionContext mongo.SessionContext) error {
-		collection := client.GetMongoClient().GetDB().Collection(model.CollectionTask)
+		collection := dmongo.GetClient().GetDB().Collection(model.CollectionTask)
 		_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
 		return err
 	}
-	err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
-	if err != nil {
-		openlog.Error(err.Error())
-	}
-	return err
+	return dmongo.GetClient().ExecTxn(ctx, deleteFunc)
 }
 func (d *Dao) List(ctx context.Context, options ...datasource.TaskFindOption) ([]*sync.Task, error) {
 	opts := datasource.NewTaskFindOptions()
 	for _, o := range options {
 		o(&opts)
 	}
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTask)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTask)
 	filter := bson.M{}
 	if opts.Domain != "" {
 		filter[model.ColumnDomain] = opts.Domain
@@ -121,7 +114,6 @@
 	for cur.Next(ctx) {
 		task := &sync.Task{}
 		if err := cur.Decode(task); err != nil {
-			openlog.Error("decode to task error: " + err.Error())
 			return nil, err
 		}
 		tasks = append(tasks, task)
diff --git a/eventbase/datasource/mongo/tombstone/tombstone_dao.go b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
index 0bc038b..85621fc 100644
--- a/eventbase/datasource/mongo/tombstone/tombstone_dao.go
+++ b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
@@ -20,13 +20,12 @@
 import (
 	"context"
 
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	"github.com/go-chassis/cari/sync"
-	"github.com/go-chassis/openlog"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
 
 	"github.com/apache/servicecomb-service-center/eventbase/datasource"
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/client"
 	"github.com/apache/servicecomb-service-center/eventbase/datasource/mongo/model"
 	emodel "github.com/apache/servicecomb-service-center/eventbase/model"
 )
@@ -35,33 +34,29 @@
 }
 
 func (d *Dao) Get(ctx context.Context, req *emodel.GetTombstoneRequest) (*sync.Tombstone, error) {
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTombstone)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTombstone)
 	filter := bson.M{model.ColumnDomain: req.Domain, model.ColumnProject: req.Project,
 		model.ColumnResourceType: req.ResourceType, model.ColumnResourceID: req.ResourceID}
 	result := collection.FindOne(ctx, filter)
 	if result != nil && result.Err() != nil {
-		openlog.Error("fail to get tombstone" + result.Err().Error())
 		return nil, result.Err()
 	}
 	if result == nil {
-		openlog.Error(datasource.ErrTombstoneNotExists.Error())
 		return nil, datasource.ErrTombstoneNotExists
 	}
 	var tombstone sync.Tombstone
 
 	err := result.Decode(&tombstone)
 	if err != nil {
-		openlog.Error("fail to decode tombstone" + err.Error())
 		return nil, err
 	}
 	return &tombstone, nil
 }
 
 func (d *Dao) Create(ctx context.Context, tombstone *sync.Tombstone) (*sync.Tombstone, error) {
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTombstone)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTombstone)
 	_, err := collection.InsertOne(ctx, tombstone)
 	if err != nil {
-		openlog.Error("fail to create tombstone" + err.Error())
 		return nil, err
 	}
 	return tombstone, nil
@@ -81,15 +76,11 @@
 		filter = append(filter, dFilter)
 	}
 	var deleteFunc = func(sessionContext mongo.SessionContext) error {
-		collection := client.GetMongoClient().GetDB().Collection(model.CollectionTombstone)
+		collection := dmongo.GetClient().GetDB().Collection(model.CollectionTombstone)
 		_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
 		return err
 	}
-	err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
-	if err != nil {
-		openlog.Error(err.Error())
-	}
-	return err
+	return dmongo.GetClient().ExecTxn(ctx, deleteFunc)
 }
 
 func (d *Dao) List(ctx context.Context, options ...datasource.TombstoneFindOption) ([]*sync.Tombstone, error) {
@@ -97,7 +88,7 @@
 	for _, o := range options {
 		o(&opts)
 	}
-	collection := client.GetMongoClient().GetDB().Collection(model.CollectionTombstone)
+	collection := dmongo.GetClient().GetDB().Collection(model.CollectionTombstone)
 	filter := bson.M{}
 	if opts.Domain != "" {
 		filter[model.ColumnDomain] = opts.Domain
@@ -120,7 +111,6 @@
 	for cur.Next(ctx) {
 		tombstone := &sync.Tombstone{}
 		if err := cur.Decode(tombstone); err != nil {
-			openlog.Error("decode to tombstone error: " + err.Error())
 			return nil, err
 		}
 		tombstones = append(tombstones, tombstone)
diff --git a/eventbase/datasource/tlsutil/tlsutil.go b/eventbase/datasource/tlsutil/tlsutil.go
deleted file mode 100644
index 925d20f..0000000
--- a/eventbase/datasource/tlsutil/tlsutil.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tlsutil
-
-import (
-	"crypto/tls"
-	"errors"
-	"io/ioutil"
-
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/foundation/stringutil"
-	"github.com/go-chassis/foundation/tlsutil"
-	"github.com/go-chassis/go-chassis/v2/security/cipher"
-	"github.com/go-chassis/openlog"
-)
-
-var ErrRootCAMissing = errors.New("rootCAFile is empty in config file")
-
-func Config(c *db.Config) (*tls.Config, error) {
-	var password string
-	if c.CertPwdFile != "" {
-		pwdBytes, err := ioutil.ReadFile(c.CertPwdFile)
-		if err != nil {
-			openlog.Error("read cert password file failed: " + err.Error())
-			return nil, err
-		}
-		password = TryDecrypt(stringutil.Bytes2str(pwdBytes))
-	}
-	if c.RootCA == "" {
-		openlog.Error(ErrRootCAMissing.Error())
-		return nil, ErrRootCAMissing
-	}
-	opts := append(tlsutil.DefaultClientTLSOptions(),
-		tlsutil.WithVerifyPeer(c.VerifyPeer),
-		tlsutil.WithVerifyHostName(false),
-		tlsutil.WithKeyPass(password),
-		tlsutil.WithCA(c.RootCA),
-		tlsutil.WithCert(c.CertFile),
-		tlsutil.WithKey(c.KeyFile),
-	)
-	return tlsutil.GetClientTLSConfig(opts...)
-}
-
-// TryDecrypt return the src when decrypt failed
-func TryDecrypt(src string) string {
-	res, err := cipher.Decrypt(src)
-	if err != nil {
-		openlog.Info("cipher fallback: " + err.Error())
-		res = src
-	}
-	return res
-}
diff --git a/eventbase/go.mod b/eventbase/go.mod
index 4fe13ce..5f7d568 100644
--- a/eventbase/go.mod
+++ b/eventbase/go.mod
@@ -1,15 +1,11 @@
 module github.com/apache/servicecomb-service-center/eventbase
 
 require (
-	github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44
-	github.com/go-chassis/foundation v0.4.0
+	github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d
 	github.com/go-chassis/go-archaius v1.5.1
-	github.com/go-chassis/go-chassis/v2 v2.3.0
-	github.com/go-chassis/openlog v1.1.3
 	github.com/little-cui/etcdadpt v0.3.2
 	github.com/stretchr/testify v1.7.0
 	go.mongodb.org/mongo-driver v1.4.2
-	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 )
 
 require (
@@ -24,6 +20,9 @@
 	github.com/dustin/go-humanize v1.0.0 // indirect
 	github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
 	github.com/fsnotify/fsnotify v1.4.7 // indirect
+	github.com/go-chassis/foundation v0.4.0 // indirect
+	github.com/go-chassis/go-chassis/v2 v2.3.0 // indirect
+	github.com/go-chassis/openlog v1.1.3 // indirect
 	github.com/go-stack/stack v1.8.0 // indirect
 	github.com/gofrs/uuid v4.0.0+incompatible // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
diff --git a/eventbase/go.sum b/eventbase/go.sum
index c7f4404..1ca8833 100644
--- a/eventbase/go.sum
+++ b/eventbase/go.sum
@@ -112,12 +112,11 @@
 github.com/go-chassis/cari v0.0.0-20201210041921-7b6fbef2df11/go.mod h1:MgtsEI0AM4Ush6Lyw27z9Gk4nQ/8GWTSXrFzupawWDM=
 github.com/go-chassis/cari v0.4.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.5.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
-github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44 h1:2JThhCkuZ5mneXFy0qRvKS7HG1/omq+Hc6I4yNhOZkI=
-github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
+github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d h1:9G1yjs6+8wxtZ+Qy4sj6bchEysEnmBqdhGaDALUnfMQ=
+github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d/go.mod h1:tKTzguHTGohMCgkcWNZWtA4TwfcsJrIXpfYxsQtb7uw=
 github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.2.2/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.3.0/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
-github.com/go-chassis/foundation v0.3.1-0.20210806081520-3bd92d1ef787/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/foundation v0.4.0 h1:z0xETnSxF+vRXWjoIhOdzt6rywjZ4sB++utEl4YgWEY=
 github.com/go-chassis/foundation v0.4.0/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/go-archaius v1.5.1 h1:1FrNyzzmD6o6BIjPF8uQ4Cc+u7qYIgQTpDk8uopBqfo=
@@ -313,8 +312,6 @@
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
-github.com/little-cui/etcdadpt v0.2.1 h1:eT1A+BV1/2/dmmZA2Nl+cc7uTMuwd6T6DD+JrXr8xcA=
-github.com/little-cui/etcdadpt v0.2.1/go.mod h1:727wftF2FS4vfkgFLmIvQue1XH+9u4lK2/hd6L7OAC8=
 github.com/little-cui/etcdadpt v0.3.2 h1:EBXPBxddZXTgWvGsIdAqqG6JCu1TouPNUhVVj9swt/s=
 github.com/little-cui/etcdadpt v0.3.2/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -735,8 +732,6 @@
 gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y=
 gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
-gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
-gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
diff --git a/eventbase/service/task/task_svc_test.go b/eventbase/service/task/task_svc_test.go
index 304741e..c325d93 100644
--- a/eventbase/service/task/task_svc_test.go
+++ b/eventbase/service/task/task_svc_test.go
@@ -32,7 +32,7 @@
 )
 
 func init() {
-	err := datasource.Init(test.DbCfg)
+	err := datasource.Init(&test.DbCfg)
 	if err != nil {
 		panic(err)
 	}
diff --git a/eventbase/service/tombstone/tombstone_svc_test.go b/eventbase/service/tombstone/tombstone_svc_test.go
index 1330104..665c1d3 100644
--- a/eventbase/service/tombstone/tombstone_svc_test.go
+++ b/eventbase/service/tombstone/tombstone_svc_test.go
@@ -31,7 +31,7 @@
 )
 
 func init() {
-	err := datasource.Init(test.DbCfg)
+	err := datasource.Init(&test.DbCfg)
 	if err != nil {
 		panic(err)
 	}
diff --git a/eventbase/test/test.go b/eventbase/test/test.go
index 71f74a9..c2b690f 100644
--- a/eventbase/test/test.go
+++ b/eventbase/test/test.go
@@ -20,7 +20,7 @@
 import (
 	"time"
 
-	"github.com/go-chassis/cari/db"
+	"github.com/go-chassis/cari/db/config"
 	"github.com/go-chassis/go-archaius"
 
 	_ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
@@ -35,7 +35,7 @@
 	DefaultTestDBURI = "http://127.0.0.1:2379"
 )
 
-var DbCfg = db.Config{}
+var DbCfg = config.Config{}
 
 func init() {
 	err := archaius.Init(archaius.WithMemorySource(), archaius.WithENVSource())
diff --git a/go.mod b/go.mod
index 823b434..5518ba6 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@
 	github.com/deckarep/golang-set v1.7.1
 	github.com/elithrar/simple-scrypt v1.3.0
 	github.com/ghodss/yaml v1.0.0
-	github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6
+	github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d
 	github.com/go-chassis/foundation v0.4.0
 	github.com/go-chassis/go-archaius v1.5.1
 	github.com/go-chassis/go-chassis-extension/protocol/grpc v0.0.0-20210902082902-eb5df922afcd
@@ -174,7 +174,6 @@
 	google.golang.org/protobuf v1.27.1 // indirect
 	gopkg.in/cheggaaa/pb.v1 v1.0.25 // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
-	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
 	gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
diff --git a/go.sum b/go.sum
index 26b68da..d521f6d 100644
--- a/go.sum
+++ b/go.sum
@@ -179,13 +179,11 @@
 github.com/go-chassis/cari v0.4.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.5.0/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.5.1-0.20210823023004-74041d1363c4/go.mod h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
-github.com/go-chassis/cari v0.5.1-0.20211227133501-53aa20cf7a44/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6 h1:7Ino94E57cnvKmyKVR0bDhZl/jcI+U2VMsHUD6qAvwg=
-github.com/go-chassis/cari v0.5.1-0.20211229072151-7fa40d0919c6/go.mod h1:HG0Olv4sy/4e/3e9S0pofO0pzchaDjJ0hMweyFU7d5Q=
+github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d h1:9G1yjs6+8wxtZ+Qy4sj6bchEysEnmBqdhGaDALUnfMQ=
+github.com/go-chassis/cari v0.5.1-0.20220117143645-570968c7043d/go.mod h1:tKTzguHTGohMCgkcWNZWtA4TwfcsJrIXpfYxsQtb7uw=
 github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.2.2/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.3.0/go.mod h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
-github.com/go-chassis/foundation v0.3.1-0.20210806081520-3bd92d1ef787/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/foundation v0.4.0 h1:z0xETnSxF+vRXWjoIhOdzt6rywjZ4sB++utEl4YgWEY=
 github.com/go-chassis/foundation v0.4.0/go.mod h1:6NsIUaHghTFRGfCBcZN011zl196F6OR5QvD9N+P4oWU=
 github.com/go-chassis/go-archaius v1.5.1 h1:1FrNyzzmD6o6BIjPF8uQ4Cc+u7qYIgQTpDk8uopBqfo=
@@ -437,9 +435,6 @@
 github.com/ledisdb/ledisdb v0.0.0-20200510135210-d35789ec47e6/go.mod h1:n931TsDuKuq+uX4v1fulaMbA/7ZLLhjc85h7chZGBCQ=
 github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
 github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
-github.com/little-cui/etcdadpt v0.2.1/go.mod h1:727wftF2FS4vfkgFLmIvQue1XH+9u4lK2/hd6L7OAC8=
-github.com/little-cui/etcdadpt v0.3.1 h1:lAPIffcOR6jROu/mWf+zHscV8urIu1qbsJvwvziLWDY=
-github.com/little-cui/etcdadpt v0.3.1/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
 github.com/little-cui/etcdadpt v0.3.2 h1:EBXPBxddZXTgWvGsIdAqqG6JCu1TouPNUhVVj9swt/s=
 github.com/little-cui/etcdadpt v0.3.2/go.mod h1:HnRRpIrVEVNWobkiCvG2EHLWKKZ+L047EcI29ma2zA4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -977,7 +972,6 @@
 gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
 gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
-gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
diff --git a/syncer/service/tombstone/tombstone_test.go b/syncer/service/tombstone/tombstone_test.go
index 2f3659e..5ae92b6 100644
--- a/syncer/service/tombstone/tombstone_test.go
+++ b/syncer/service/tombstone/tombstone_test.go
@@ -32,7 +32,7 @@
 )
 
 func init() {
-	err := datasource.Init(test.DbCfg)
+	err := datasource.Init(&test.DbCfg)
 	if err != nil {
 		panic(err)
 	}
diff --git a/test/test.go b/test/test.go
index 7a6fe44..00d2a22 100644
--- a/test/test.go
+++ b/test/test.go
@@ -22,19 +22,21 @@
 	"context"
 	"time"
 
+	"github.com/go-chassis/cari/db/config"
+	"github.com/go-chassis/go-archaius"
+	"github.com/little-cui/etcdadpt"
+
 	_ "github.com/apache/servicecomb-service-center/server/init"
 
 	_ "github.com/apache/servicecomb-service-center/eventbase/bootstrap"
 	_ "github.com/apache/servicecomb-service-center/server/bootstrap"
+	_ "github.com/apache/servicecomb-service-center/server/init"
 	_ "github.com/go-chassis/go-chassis-extension/protocol/grpc/server"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	edatasource "github.com/apache/servicecomb-service-center/eventbase/datasource"
 	"github.com/apache/servicecomb-service-center/server/metrics"
 	"github.com/apache/servicecomb-service-center/server/service/registry"
-	"github.com/go-chassis/cari/db"
-	"github.com/go-chassis/go-archaius"
-	"github.com/little-cui/etcdadpt"
 )
 
 func init() {
@@ -62,7 +64,7 @@
 	})
 	_ = metrics.Init(metrics.Options{})
 
-	_ = edatasource.Init(db.Config{
+	_ = edatasource.Init(&config.Config{
 		Kind:    kind,
 		URI:     uri,
 		Timeout: 10 * time.Second,