blob: 5a163fb46a5dc102bc0e868491614afeb19493d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package v1 hold http rest v1 API
package v1
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/cari/config"
"github.com/go-chassis/foundation/validator"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"
"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/pubsub"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
)
// KVResource has API about kv operations
type KVResource struct {
}
// Upload upload kvs
func (r *KVResource) Upload(rctx *restful.Context) {
if rctx.ReadQueryParameter(common.QueryParamOverride) == "" {
WriteErrResponse(rctx, config.ErrInvalidParams, "Query parameter 'override' is required")
return
}
var err error
inputUpload := new(KVUploadBody)
if err = readRequest(rctx, &inputUpload); err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, fmt.Sprintf(FmtReadRequestError, err))
return
}
result := kvsvc.Upload(rctx.Ctx, &model.UploadKVRequest{
Domain: ReadDomain(rctx.Ctx),
Project: rctx.ReadPathParameter(common.PathParameterProject),
KVs: inputUpload.Data,
Override: rctx.ReadQueryParameter(common.QueryParamOverride),
})
err = writeResponse(rctx, result)
if err != nil {
openlog.Error(err.Error())
}
}
// Post create a kv
func (r *KVResource) Post(rctx *restful.Context) {
var err error
kv := new(model.KVDoc)
if err = readRequest(rctx, kv); err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, fmt.Sprintf(FmtReadRequestError, err))
return
}
kv.Domain = ReadDomain(rctx.Ctx)
kv.Project = rctx.ReadPathParameter(common.PathParameterProject)
kv, postErr := kvsvc.Create(rctx.Ctx, kv)
if postErr != nil {
openlog.Error(fmt.Sprintf("post kv [%v] failed, err:%s", kv, postErr.Error()))
WriteErrResponse(rctx, postErr.Code, postErr.Error())
return
}
kvsvc.Publish(kv)
err = writeResponse(rctx, kv)
if err != nil {
openlog.Error(err.Error())
}
}
// Put update a kv
func (r *KVResource) Put(rctx *restful.Context) {
var err error
kvID := rctx.ReadPathParameter(common.PathParamKVID)
project := rctx.ReadPathParameter(common.PathParameterProject)
kvReq := new(model.UpdateKVRequest)
if err = readRequest(rctx, kvReq); err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, fmt.Sprintf(FmtReadRequestError, err))
return
}
domain := ReadDomain(rctx.Ctx)
kvReq.ID = kvID
kvReq.Domain = domain
kvReq.Project = project
err = validator.Validate(kvReq)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
kv, err := kvsvc.Update(rctx.Ctx, kvReq)
if err != nil {
openlog.Error(fmt.Sprintf("put [%s] err:%s", kvID, err.Error()))
if err == datasource.ErrKeyNotExists {
WriteErrResponse(rctx, config.ErrRecordNotExists, err.Error())
return
}
WriteError(rctx, err)
return
}
err = pubsub.Publish(&pubsub.KVChangeEvent{
Key: kv.Key,
Labels: kv.Labels,
Project: project,
DomainID: kv.Domain,
Action: pubsub.ActionPut,
})
if err != nil {
openlog.Warn("lost kv change event when put:" + err.Error())
}
openlog.Info(
fmt.Sprintf("put [%s] success", kvID))
err = writeResponse(rctx, kv)
if err != nil {
openlog.Error(err.Error())
}
}
// Get search key by kv id
func (r *KVResource) Get(rctx *restful.Context) {
request := &model.GetKVRequest{
Project: rctx.ReadPathParameter(common.PathParameterProject),
Domain: ReadDomain(rctx.Ctx),
ID: rctx.ReadPathParameter(common.PathParamKVID),
}
err := validator.Validate(request)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
kv, err := kvsvc.Get(rctx.Ctx, request)
if err != nil {
openlog.Error("kv_resource: " + err.Error())
if err == datasource.ErrKeyNotExists {
WriteErrResponse(rctx, config.ErrRecordNotExists, err.Error())
return
}
WriteError(rctx, err)
return
}
kv.Domain = ""
kv.Project = ""
err = writeResponse(rctx, kv)
if err != nil {
openlog.Error(err.Error())
}
}
// List response kv list
func (r *KVResource) List(rctx *restful.Context) {
var err error
request := &model.ListKVRequest{
Project: rctx.ReadPathParameter(common.PathParameterProject),
Domain: ReadDomain(rctx.Ctx),
Key: rctx.ReadQueryParameter(common.QueryParamKey),
Value: rctx.ReadQueryParameter(common.QueryParamValue),
Status: rctx.ReadQueryParameter(common.QueryParamStatus),
Match: getMatchPattern(rctx),
}
labels, err := getLabels(rctx)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, common.MsgIllegalLabels)
return
}
request.Labels = labels
offsetStr := rctx.ReadQueryParameter(common.QueryParamOffset)
limitStr := rctx.ReadQueryParameter(common.QueryParamLimit)
offset, limit, err := checkPagination(offsetStr, limitStr)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
request.Offset = offset
request.Limit = limit
err = validator.Validate(request)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
returnData(rctx, request)
}
func returnData(rctx *restful.Context, request *model.ListKVRequest) {
revStr := rctx.ReadQueryParameter(common.QueryParamRev)
wait := rctx.ReadQueryParameter(common.QueryParamWait)
if wait != "" {
duration, err := time.ParseDuration(wait)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
var cancel context.CancelFunc
rctx.Ctx, cancel = context.WithTimeout(rctx.Ctx, duration)
defer cancel()
}
if revStr == "" {
if wait == "" {
queryAndResponse(rctx, request)
return
}
if !isLegalWaitRequest(rctx, request) {
return
}
if watch(rctx, request, wait) {
return
}
rctx.WriteHeader(http.StatusNotModified)
return
}
revised, err := revNotMatch(rctx.Ctx, revStr, request.Domain)
if err != nil {
if err == ErrInvalidRev {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
WriteErrResponse(rctx, config.ErrInternal, err.Error())
return
}
if revised {
queryAndResponse(rctx, request)
return
} else if wait != "" {
if !isLegalWaitRequest(rctx, request) {
return
}
if watch(rctx, request, wait) {
return
}
rctx.WriteHeader(http.StatusNotModified)
return
} else {
rctx.WriteHeader(http.StatusNotModified)
}
}
func isLegalWaitRequest(rctx *restful.Context, request *model.ListKVRequest) bool {
if request.Key != "" {
WriteErrResponse(rctx, config.ErrInvalidParams, "can not accept key params, when using wait")
return false
}
return true
}
func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) bool {
changed, topic, err := eventHappened(wait, &pubsub.Topic{
Labels: request.Labels,
Project: request.Project,
MatchType: request.Match,
DomainID: request.Domain,
}, rctx.Ctx)
if err != nil {
WriteErrResponse(rctx, config.ErrObserveEvent, err.Error())
return true
}
if changed {
queryFromCache(rctx, topic)
return true
}
return false
}
// Delete deletes one kv by id
func (r *KVResource) Delete(rctx *restful.Context) {
project := rctx.ReadPathParameter(common.PathParameterProject)
domain := ReadDomain(rctx.Ctx)
kvID := rctx.ReadPathParameter(common.PathParamKVID)
err := validateDelete(domain, project, kvID)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
kv, err := kvsvc.FindOneAndDelete(rctx.Ctx, kvID, project, domain)
if err != nil {
openlog.Error("delete failed, ", openlog.WithTags(openlog.Tags{
"kvID": kvID,
"error": err.Error(),
}))
if err == datasource.ErrKeyNotExists {
WriteErrResponse(rctx, config.ErrRecordNotExists, err.Error())
return
}
WriteError(rctx, err)
return
}
err = pubsub.Publish(&pubsub.KVChangeEvent{
Key: kv.Key,
Labels: kv.Labels,
Project: project,
DomainID: domain,
Action: pubsub.ActionDelete,
})
if err != nil {
openlog.Warn("lost kv change event:" + err.Error())
}
rctx.WriteHeader(http.StatusNoContent)
}
// DeleteList deletes multiple kvs by ids
func (r *KVResource) DeleteList(rctx *restful.Context) {
project := rctx.ReadPathParameter(common.PathParameterProject)
domain := ReadDomain(rctx.Ctx)
b := new(DeleteBody)
if err := json.NewDecoder(rctx.ReadRequest().Body).Decode(b); err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, fmt.Sprintf(FmtReadRequestError, err))
return
}
err := validateDeleteList(domain, project)
if err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
return
}
kvs, err := kvsvc.FindManyAndDelete(rctx.Ctx, b.IDs, project, domain)
if err != nil {
if err == datasource.ErrKeyNotExists {
rctx.WriteHeader(http.StatusNoContent)
return
}
openlog.Error("delete list failed, ", openlog.WithTags(openlog.Tags{
"kvIDs": b.IDs,
"error": err.Error(),
}))
WriteError(rctx, err)
return
}
for _, kv := range kvs {
err = pubsub.Publish(&pubsub.KVChangeEvent{
Key: kv.Key,
Labels: kv.Labels,
Project: project,
DomainID: domain,
Action: pubsub.ActionDelete,
})
if err != nil {
openlog.Warn("lost kv change event:" + err.Error())
}
}
rctx.WriteHeader(http.StatusNoContent)
}
// URLPatterns defined config operations
func (r *KVResource) URLPatterns() []restful.Route {
return []restful.Route{
{
Method: http.MethodPost,
Path: "/v1/{project}/kie/file",
ResourceFunc: r.Upload,
FuncDesc: "upload key values",
Parameters: []*restful.Parameters{
DocPathProject,
DocHeaderContentTypeJSONAndYaml,
},
Read: KVUploadBody{},
Returns: []*restful.Returns{
{
Code: http.StatusOK,
Model: model.DocRespOfUpload{},
},
},
Consumes: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
Produces: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
}, {
Method: http.MethodPost,
Path: "/v1/{project}/kie/kv",
ResourceFunc: r.Post,
FuncDesc: "create a key value",
Parameters: []*restful.Parameters{
DocPathProject, DocHeaderContentTypeJSONAndYaml,
},
Read: KVCreateBody{},
Returns: []*restful.Returns{
{
Code: http.StatusOK,
Model: model.DocResponseSingleKey{},
},
},
Consumes: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
Produces: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
}, {
Method: http.MethodPut,
Path: "/v1/{project}/kie/kv/{kv_id}",
ResourceFunc: r.Put,
FuncDesc: "update a key value",
Parameters: []*restful.Parameters{
DocPathProject, DocPathKeyID, DocHeaderContentTypeJSONAndYaml,
},
Read: KVUpdateBody{},
Returns: []*restful.Returns{
{
Code: http.StatusOK,
Model: model.DocResponseSingleKey{},
},
},
Consumes: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
Produces: []string{goRestful.MIME_JSON, common.ContentTypeYaml},
}, {
Method: http.MethodGet,
Path: "/v1/{project}/kie/kv/{kv_id}",
ResourceFunc: r.Get,
FuncDesc: "get key values by kv_id",
Parameters: []*restful.Parameters{
DocPathProject, DocPathKeyID,
},
Returns: []*restful.Returns{
{
Code: http.StatusOK,
Message: "get key value success",
Model: model.DocResponseSingleKey{},
Headers: map[string]goRestful.Header{
common.HeaderRevision: DocHeaderRevision,
},
},
{
Code: http.StatusNotFound,
Message: "key value not found",
},
},
Produces: []string{goRestful.MIME_JSON},
}, {
Method: http.MethodGet,
Path: "/v1/{project}/kie/kv",
ResourceFunc: r.List,
FuncDesc: "list key values by labels and key",
Parameters: []*restful.Parameters{
DocPathProject, DocQueryKeyParameters, DocQueryStatusParameters, DocQueryLabelParameters,
DocQueryWait, DocQueryMatch, DocQueryRev, DocQueryLimitParameters, DocQueryOffsetParameters,
},
Returns: []*restful.Returns{
{
Code: http.StatusOK,
Model: model.DocResponseGetKey{},
Headers: map[string]goRestful.Header{
common.HeaderRevision: DocHeaderRevision,
},
}, {
Code: http.StatusNotModified,
Message: "empty body",
},
},
Produces: []string{goRestful.MIME_JSON},
}, {
Method: http.MethodDelete,
Path: "/v1/{project}/kie/kv/{kv_id}",
ResourceFunc: r.Delete,
FuncDesc: "delete key by kv ID.",
Parameters: []*restful.Parameters{
DocPathProject,
DocPathKeyID,
},
Returns: []*restful.Returns{
{
Code: http.StatusNoContent,
Message: "delete success",
},
{
Code: http.StatusNotFound,
Message: "no key value found for deletion",
},
{
Code: http.StatusInternalServerError,
Message: "server error",
},
},
}, {
Method: http.MethodDelete,
Path: "/v1/{project}/kie/kv",
ResourceFunc: r.DeleteList,
FuncDesc: "delete keys.",
Parameters: []*restful.Parameters{
DocPathProject, DocHeaderContentTypeJSON,
},
Read: DeleteBody{},
Returns: []*restful.Returns{
{
Code: http.StatusNoContent,
Message: "delete success",
},
{
Code: http.StatusInternalServerError,
Message: "server error",
},
},
Consumes: []string{goRestful.MIME_JSON},
Produces: []string{goRestful.MIME_JSON},
},
}
}