blob: d156a177de4597144df1bcaca64211520d813af8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongo
import (
"strings"
"github.com/go-chassis/cari/db"
"github.com/go-chassis/openlog"
"go.mongodb.org/mongo-driver/bson"
"gopkg.in/mgo.v2"
"servicecomb-service-center/eventbase/datasource"
"servicecomb-service-center/eventbase/datasource/mongo/client"
"servicecomb-service-center/eventbase/datasource/mongo/task"
"servicecomb-service-center/eventbase/datasource/mongo/tombstone"
)
type Datasource struct {
taskDao datasource.TaskDao
tombstone datasource.TombstoneDao
}
func (d *Datasource) TaskDao() datasource.TaskDao {
return d.taskDao
}
func (d *Datasource) TombstoneDao() datasource.TombstoneDao {
return d.tombstone
}
func NewDatasource(config *db.Config) (datasource.DataSource, error) {
inst := &Datasource{}
inst.taskDao = &task.Dao{}
inst.tombstone = &tombstone.Dao{}
return inst, inst.initialize(config)
}
func (d *Datasource) initialize(config *db.Config) error {
err := d.initClient(config)
if err != nil {
return err
}
ensureDB(config)
return nil
}
func (d *Datasource) initClient(config *db.Config) error {
client.NewMongoClient(config)
select {
case err := <-client.GetMongoClient().Err():
return err
case <-client.GetMongoClient().Ready():
return nil
}
}
func init() {
datasource.RegisterPlugin("mongo", NewDatasource)
}
func ensureDB(config *db.Config) {
session := openSession(config)
defer session.Close()
session.SetMode(mgo.Primary, true)
ensureTask(session)
ensureTombstone(session)
}
func openSession(c *db.Config) *mgo.Session {
timeout := c.Timeout
var err error
session, err := mgo.DialWithTimeout(c.URI, timeout)
if err != nil {
openlog.Warn("can not dial db, retry once:" + err.Error())
session, err = mgo.DialWithTimeout(c.URI, timeout)
if err != nil {
openlog.Fatal("can not dial db:" + err.Error())
}
}
return session
}
func wrapError(err error, skipMsg ...string) {
if err != nil {
for _, str := range skipMsg {
if strings.Contains(err.Error(), str) {
openlog.Debug(err.Error())
return
}
}
openlog.Error(err.Error())
}
}
func ensureTask(session *mgo.Session) {
c := session.DB(DBName).C(CollectionTask)
err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
ColumnTaskID: bson.M{"$exists": true},
ColumnDomain: bson.M{"$exists": true},
ColumnProject: bson.M{"$exists": true},
ColumnTimestamp: bson.M{"$exists": true},
}})
wrapError(err)
err = c.EnsureIndex(mgo.Index{
Key: []string{ColumnDomain, ColumnProject, ColumnTaskID, ColumnTimestamp},
Unique: true,
})
wrapError(err)
}
func ensureTombstone(session *mgo.Session) {
c := session.DB(DBName).C(CollectionTombstone)
err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
ColumnResourceID: bson.M{"$exists": true},
ColumnDomain: bson.M{"$exists": true},
ColumnProject: bson.M{"$exists": true},
ColumnResourceType: bson.M{"$exists": true},
}})
wrapError(err)
err = c.EnsureIndex(mgo.Index{
Key: []string{ColumnDomain, ColumnProject, ColumnResourceID, ColumnResourceType},
Unique: true,
})
wrapError(err)
}