blob: 7deef48cd9b7b631d066e56964f9af5b375b7c50 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package task
import (
mopts ""
dmongo "servicecomb-service-center/eventbase/datasource/mongo"
type Dao struct {
func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
_, err := collection.InsertOne(ctx, task)
if err != nil {
openlog.Error("fail to create task" + err.Error())
return nil, err
return task, nil
func (d *Dao) Update(ctx context.Context, task *sync.Task) error {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
result, err := collection.UpdateOne(ctx,
bson.M{dmongo.ColumnTaskID: task.TaskID, dmongo.ColumnDomain: task.Domain, dmongo.ColumnProject: task.Project, dmongo.ColumnTimestamp: task.Timestamp},
bson.D{{Key: "$set", Value: bson.D{
{Key: dmongo.ColumnStatus, Value: task.Status}}},
if err != nil {
openlog.Error("fail to update task" + err.Error())
return err
if result.ModifiedCount == 0 {
openlog.Error("fail to update task" + datasource.ErrTaskNotExists.Error())
return datasource.ErrTaskNotExists
return nil
func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
tasksIDs := make([]string, len(tasks))
filter := bson.A{}
for i, task := range tasks {
tasksIDs[i] = task.TaskID
dFilter := bson.D{
{dmongo.ColumnDomain, task.Domain},
{dmongo.ColumnProject, task.Project},
{dmongo.ColumnTaskID, task.TaskID},
{dmongo.ColumnTimestamp, task.Timestamp},
filter = append(filter, dFilter)
var deleteFunc = func(sessionContext mongo.SessionContext) error {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
return err
err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
if err != nil {
return err
func (d *Dao) List(ctx context.Context, domain string, project string, options ...datasource.TaskFindOption) ([]*sync.Task, error) {
opts := datasource.NewTaskFindOptions()
for _, o := range options {
collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
filter := bson.M{dmongo.ColumnDomain: domain, dmongo.ColumnProject: project}
if opts.Action != "" {
filter[dmongo.ColumnAction] = opts.Action
if opts.DataType != "" {
filter[dmongo.ColumnDataType] = opts.DataType
if opts.Status != "" {
filter[dmongo.ColumnStatus] = opts.Status
opt := mopts.Find().SetSort(map[string]interface{}{
dmongo.ColumnTimestamp: 1,
cur, err := collection.Find(ctx, filter, opt)
if err != nil {
return nil, err
defer cur.Close(ctx)
tasks := make([]*sync.Task, 0)
for cur.Next(ctx) {
task := &sync.Task{}
if err := cur.Decode(task); err != nil {
openlog.Error("decode to task error: " + err.Error())
return nil, err
tasks = append(tasks, task)
return tasks, nil