blob: 7deef48cd9b7b631d066e56964f9af5b375b7c50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package task
import (
"context"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/openlog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
mopts "go.mongodb.org/mongo-driver/mongo/options"
"servicecomb-service-center/eventbase/datasource"
dmongo "servicecomb-service-center/eventbase/datasource/mongo"
"servicecomb-service-center/eventbase/datasource/mongo/client"
)
type Dao struct {
}
func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
_, err := collection.InsertOne(ctx, task)
if err != nil {
openlog.Error("fail to create task" + err.Error())
return nil, err
}
return task, nil
}
func (d *Dao) Update(ctx context.Context, task *sync.Task) error {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
result, err := collection.UpdateOne(ctx,
bson.M{dmongo.ColumnTaskID: task.TaskID, dmongo.ColumnDomain: task.Domain, dmongo.ColumnProject: task.Project, dmongo.ColumnTimestamp: task.Timestamp},
bson.D{{Key: "$set", Value: bson.D{
{Key: dmongo.ColumnStatus, Value: task.Status}}},
})
if err != nil {
openlog.Error("fail to update task" + err.Error())
return err
}
if result.ModifiedCount == 0 {
openlog.Error("fail to update task" + datasource.ErrTaskNotExists.Error())
return datasource.ErrTaskNotExists
}
return nil
}
func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
tasksIDs := make([]string, len(tasks))
filter := bson.A{}
for i, task := range tasks {
tasksIDs[i] = task.TaskID
dFilter := bson.D{
{dmongo.ColumnDomain, task.Domain},
{dmongo.ColumnProject, task.Project},
{dmongo.ColumnTaskID, task.TaskID},
{dmongo.ColumnTimestamp, task.Timestamp},
}
filter = append(filter, dFilter)
}
var deleteFunc = func(sessionContext mongo.SessionContext) error {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
return err
}
err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
if err != nil {
openlog.Error(err.Error())
}
return err
}
func (d *Dao) List(ctx context.Context, domain string, project string, options ...datasource.TaskFindOption) ([]*sync.Task, error) {
opts := datasource.NewTaskFindOptions()
for _, o := range options {
o(&opts)
}
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
filter := bson.M{dmongo.ColumnDomain: domain, dmongo.ColumnProject: project}
if opts.Action != "" {
filter[dmongo.ColumnAction] = opts.Action
}
if opts.DataType != "" {
filter[dmongo.ColumnDataType] = opts.DataType
}
if opts.Status != "" {
filter[dmongo.ColumnStatus] = opts.Status
}
opt := mopts.Find().SetSort(map[string]interface{}{
dmongo.ColumnTimestamp: 1,
})
cur, err := collection.Find(ctx, filter, opt)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
tasks := make([]*sync.Task, 0)
for cur.Next(ctx) {
task := &sync.Task{}
if err := cur.Decode(task); err != nil {
openlog.Error("decode to task error: " + err.Error())
return nil, err
}
tasks = append(tasks, task)
}
return tasks, nil
}