blob: c5ae9bd71f849b845316dcefd8ae78fd08d6dbe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dao
import (
"context"
"fmt"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/go-mesh/openlogging"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
var client *mongo.Client
const (
DB = "kie"
CollectionLabel = "label"
CollectionKV = "kv"
CollectionLabelRevision = "label_revision"
DefaultTimeout = 5 * time.Second
DefaultValueType = "text"
)
type MongodbService struct {
c *mongo.Client
timeout time.Duration
}
func (s *MongodbService) CreateOrUpdate(ctx context.Context, domain string, kv *model.KVDoc) (*model.KVDoc, error) {
if domain == "" {
return nil, ErrMissingDomain
}
ctx, _ = context.WithTimeout(ctx, DefaultTimeout)
//check labels exits or not
labelID, err := s.LabelsExist(ctx, domain, kv.Labels)
var l *model.LabelDoc
if err != nil {
if err == ErrLabelNotExists {
l, err = s.createLabel(ctx, domain, kv.Labels)
if err != nil {
return nil, err
}
labelID = l.ID
} else {
return nil, err
}
}
kv.LabelID = labelID.Hex()
kv.Domain = domain
if kv.ValueType == "" {
kv.ValueType = DefaultValueType
}
keyID, err := s.KVExist(ctx, domain, kv.Key, WithLabelID(kv.LabelID))
if err != nil {
if err == ErrKeyNotExists {
kv, err := s.createKey(ctx, kv)
if err != nil {
return nil, err
}
return kv, nil
}
return nil, err
}
kv.ID = keyID
revision, err := s.updateKey(ctx, kv)
if err != nil {
return nil, err
}
kv.Revision = revision
return kv, nil
}
//FindLabels find label doc by labels
//if map is empty. will return default labels doc which has no labels
func (s *MongodbService) FindLabels(ctx context.Context, domain string, labels map[string]string) (*model.LabelDoc, error) {
collection := s.c.Database(DB).Collection(CollectionLabel)
ctx, _ = context.WithTimeout(context.Background(), DefaultTimeout)
filter := bson.M{"domain": domain}
for k, v := range labels {
filter["labels."+k] = v
}
if len(labels) == 0 {
filter["labels"] = "default" //allow key without labels
}
cur, err := collection.Find(ctx, filter)
if err != nil {
if err.Error() == context.DeadlineExceeded.Error() {
return nil, ErrAction("find label", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout))
}
return nil, err
}
defer cur.Close(ctx)
if cur.Err() != nil {
return nil, err
}
openlogging.Debug(fmt.Sprintf("find lables [%s] in [%s]", labels, domain))
curLabel := &model.LabelDoc{} //reuse this pointer to reduce GC, only clear label
//check label length to get the exact match
for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels
curLabel.Labels = nil
err := cur.Decode(curLabel)
if err != nil {
openlogging.Error("decode error: " + err.Error())
return nil, err
}
if len(curLabel.Labels) == len(labels) {
openlogging.Debug("hit exact labels")
curLabel.Labels = nil //exact match don't need to return labels
return curLabel, nil
}
}
return nil, ErrLabelNotExists
}
func (s *MongodbService) findKeys(ctx context.Context, filter bson.M, withoutLabel bool) ([]*model.KVDoc, error) {
collection := s.c.Database(DB).Collection(CollectionKV)
cur, err := collection.Find(ctx, filter)
if err != nil {
if err.Error() == context.DeadlineExceeded.Error() {
return nil, ErrAction("find", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout))
}
return nil, err
}
defer cur.Close(ctx)
if cur.Err() != nil {
return nil, err
}
kvs := make([]*model.KVDoc, 0)
curKV := &model.KVDoc{} //reduce GC,but need to clear labels
for cur.Next(ctx) {
curKV.Labels = nil
if err := cur.Decode(curKV); err != nil {
openlogging.Error("decode to KVs error: " + err.Error())
return nil, err
}
if withoutLabel {
curKV.Labels = nil
}
kvs = append(kvs, curKV)
}
if len(kvs) == 0 {
return nil, ErrKeyNotExists
}
return kvs, nil
}
//FindKVByLabelID get kvs by key and label id
//key can be empty, then it will return all key values
//if key is given, will return 0-1 key value
func (s *MongodbService) FindKVByLabelID(ctx context.Context, domain, labelID, key string) ([]*model.KVDoc, error) {
ctx, _ = context.WithTimeout(context.Background(), DefaultTimeout)
filter := bson.M{"label_id": labelID, "domain": domain}
if key != "" {
return s.findOneKey(ctx, filter, key)
} else {
return s.findKeys(ctx, filter, true)
}
}
//FindKV get kvs by key, labels
//because labels has a a lot of combination,
//you can use WithExactLabels to return only one kv which's labels exactly match the criteria
func (s *MongodbService) FindKV(ctx context.Context, domain string, options ...FindOption) ([]*model.KVDoc, error) {
opts := FindOptions{}
for _, o := range options {
o(&opts)
}
if domain == "" {
return nil, ErrMissingDomain
}
collection := s.c.Database(DB).Collection(CollectionKV)
ctx, _ = context.WithTimeout(ctx, DefaultTimeout)
filter := bson.M{"domain": domain}
if opts.Key != "" {
filter["key"] = opts.Key
}
for k, v := range opts.Labels {
filter["labels."+k] = v
}
cur, err := collection.Find(ctx, filter)
if err != nil {
if err.Error() == context.DeadlineExceeded.Error() {
return nil, ErrAction("find", filter, fmt.Errorf("can not reach mongodb in %s", s.timeout))
}
return nil, err
}
defer cur.Close(ctx)
if cur.Err() != nil {
return nil, err
}
if opts.ExactLabels {
openlogging.Debug(fmt.Sprintf("find one [%s] with lables [%s] in [%s]", opts.Key, opts.Labels, domain))
curKV := &model.KVDoc{} //reuse this pointer to reduce GC, only clear label
//check label length to get the exact match
for cur.Next(ctx) { //although complexity is O(n), but there won't be so much labels for one key
curKV.Labels = nil
err := cur.Decode(curKV)
if err != nil {
openlogging.Error("decode error: " + err.Error())
return nil, err
}
if len(curKV.Labels) == len(opts.Labels) {
openlogging.Debug("hit exact labels")
curKV.Labels = nil //exact match don't need to return labels
return []*model.KVDoc{curKV}, nil
}
}
return nil, ErrKeyNotExists
} else {
kvs := make([]*model.KVDoc, 0)
for cur.Next(ctx) {
curKV := &model.KVDoc{}
if err := cur.Decode(curKV); err != nil {
openlogging.Error("decode to KVs error: " + err.Error())
return nil, err
}
kvs = append(kvs, curKV)
}
if len(kvs) == 0 {
return nil, ErrKeyNotExists
}
return kvs, nil
}
}
func (s *MongodbService) DeleteByID(id string) error {
collection := s.c.Database(DB).Collection(CollectionKV)
hex, err := primitive.ObjectIDFromHex(id)
if err != nil {
openlogging.Error(fmt.Sprintf("convert %s ,err:%s", id, err))
return err
}
ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
dr, err := collection.DeleteOne(ctx, bson.M{"_id": hex})
if err != nil {
openlogging.Error(fmt.Sprintf("delete [%s] failed: %s", hex, err))
}
if dr.DeletedCount != 1 {
openlogging.Warn(fmt.Sprintf("delete [%s], but it is not exist", hex))
}
return nil
}
func (s *MongodbService) Delete(ids []string, domain string) error {
if len(ids) == 0 {
openlogging.Warn("delete error,ids is blank")
return nil
}
if domain == "" {
return ErrMissingDomain
}
collection := s.c.Database(DB).Collection(CollectionKV)
//transfer id
var oid []primitive.ObjectID
for _, v := range ids {
if v == "" {
openlogging.Warn("ids contains continuous ','")
continue
}
hex, err := primitive.ObjectIDFromHex(v)
if err != nil {
openlogging.Error(fmt.Sprintf("convert %s ,err:%s", v, err))
return err
}
oid = append(oid, hex)
}
//use in filter
filter := &bson.M{"_id": &bson.M{"$in": oid}, "domain": domain}
ctx, _ := context.WithTimeout(context.Background(), DefaultTimeout)
dr, err := collection.DeleteMany(ctx, filter)
//check error and delete number
if err != nil {
openlogging.Error(fmt.Sprintf("delete [%v] failed : [%s]", filter, err))
return err
}
if dr.DeletedCount != int64(len(oid)) {
openlogging.Warn(fmt.Sprintf(" The actual number of deletions[%d] is not equal to the parameters[%d].", dr.DeletedCount, len(oid)))
} else {
openlogging.Info(fmt.Sprintf("delete success,count=%d", dr.DeletedCount))
}
return nil
}
func NewMongoService(opts Options) (*MongodbService, error) {
if opts.Timeout == 0 {
opts.Timeout = DefaultTimeout
}
c, err := getClient(opts)
if err != nil {
return nil, err
}
m := &MongodbService{
c: c,
timeout: opts.Timeout,
}
return m, nil
}
func getClient(opts Options) (*mongo.Client, error) {
if client == nil {
var err error
client, err = mongo.NewClient(options.Client().ApplyURI(opts.URI))
if err != nil {
return nil, err
}
openlogging.Info("connecting to " + opts.URI)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctx)
if err != nil {
return nil, err
}
openlogging.Info("connected to " + opts.URI)
}
return client, nil
}