blob: 7f45507e5c986de74eea9156a1bdbb7f619de090 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package task
import (
"context"
"encoding/json"
"github.com/apache/servicecomb-service-center/eventbase/datasource"
"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
"github.com/go-chassis/cari/sync"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
)
type Dao struct {
}
func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, err
}
ok, err := etcdadpt.InsertBytes(ctx, key.TaskKey(task.Domain, task.Project, task.ID, task.Timestamp), taskBytes)
if err != nil {
return nil, err
}
if !ok {
return nil, datasource.ErrTaskAlreadyExists
}
return task, nil
}
func (d *Dao) Update(ctx context.Context, task *sync.Task) error {
keyTask := key.TaskKey(task.Domain, task.Project, task.ID, task.Timestamp)
resp, err := etcdadpt.Get(ctx, keyTask)
if err != nil {
return err
}
if resp == nil {
return datasource.ErrTaskNotExists
}
var dbTask sync.Task
err = json.Unmarshal(resp.Value, &dbTask)
if err != nil {
return err
}
dbTask.Status = task.Status
taskBytes, err := json.Marshal(dbTask)
if err != nil {
return err
}
return etcdadpt.PutBytes(ctx, keyTask, taskBytes)
}
func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
delOptions := make([]etcdadpt.OpOptions, len(tasks))
for i, task := range tasks {
delOptions[i] = etcdadpt.OpDel(etcdadpt.WithStrKey(key.TaskKey(task.Domain, task.Project, task.ID, task.Timestamp)))
}
err := etcdadpt.Txn(ctx, delOptions)
if err != nil {
return err
}
return nil
}
func (d Dao) List(ctx context.Context, options ...datasource.TaskFindOption) ([]*sync.Task, error) {
opts := datasource.NewTaskFindOptions()
for _, o := range options {
o(&opts)
}
tasks := make([]*sync.Task, 0)
kvs, _, err := etcdadpt.List(ctx, key.TaskList(opts.Domain, opts.Project))
if err != nil {
return tasks, err
}
for _, kv := range kvs {
task := sync.Task{}
err := json.Unmarshal(kv.Value, &task)
if err != nil {
datasource.Logger().Error("unmarshal task failed", openlog.WithErr(err))
continue
}
if !filterMatch(&task, opts) {
continue
}
tasks = append(tasks, &task)
}
return tasks, nil
}
func filterMatch(task *sync.Task, options datasource.TaskFindOptions) bool {
if options.Action != "" && task.Action != options.Action {
return false
}
if options.ResourceType != "" && task.ResourceType != options.ResourceType {
return false
}
if options.Status != "" && task.Status != options.Status {
return false
}
return true
}