blob: da4612e09b9e6af18a883eedf81e591fe756c3d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sd
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
md "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
)
type mongoListWatch struct {
Key string
resumeToken bson.Raw
}
func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig) (*sdcommon.ListWatchResp, error) {
otCtx, cancel := context.WithTimeout(op.Context, op.Timeout)
defer cancel()
resp, err := client.GetMongoClient().Find(otCtx, lw.Key, bson.M{})
if err != nil {
log.Error(fmt.Sprintf("list key %s failed", lw.Key), err)
return nil, err
}
// convert mongoListResponse to ListWatchResp
lwRsp := &sdcommon.ListWatchResp{}
lwRsp.Resources = make([]*sdcommon.Resource, 0)
for resp.Next(context.Background()) {
info := lw.doParseDocumentToResource(resp.Current)
lwRsp.Resources = append(lwRsp.Resources, &info)
}
return lwRsp, nil
}
func (lw *mongoListWatch) EventBus(op sdcommon.ListWatchConfig) *sdcommon.EventBus {
return sdcommon.NewEventBus(lw, op)
}
func (lw *mongoListWatch) DoWatch(ctx context.Context, f func(*sdcommon.ListWatchResp)) error {
csOptions := &options.ChangeStreamOptions{}
csOptions.SetFullDocument(options.UpdateLookup)
resumeToken := lw.resumeToken
if resumeToken != nil {
csOptions.SetResumeAfter(resumeToken)
}
pipline := md.Pipeline{}
if lw.Key == instance {
// ignore instance refresh_time change event for avoid meaningless instance push.
match := bson.D{{"updateDescription.updatedFields.refresh_time", bson.D{{"$exists", false}}}}
pipline = md.Pipeline{{{"$match", match}}}
}
resp, err := client.GetMongoClient().Watch(ctx, lw.Key, pipline, csOptions)
if err != nil {
log.Error(fmt.Sprintf("watch table %s failed", lw.Key), err)
f(nil)
return err
}
for resp.Next(ctx) {
lw.resumeToken = resp.ResumeToken()
wRsp := &MongoWatchResponse{}
err := bson.Unmarshal(resp.Current, &wRsp)
if err != nil {
log.Error("error to parse bson raw to mongo watch response", err)
return err
}
// convert mongoWatchResponse to ListWatchResp
resource := lw.doParseWatchRspToResource(wRsp)
lwRsp := &sdcommon.ListWatchResp{}
lwRsp.Resources = append(lwRsp.Resources, &resource)
switch wRsp.OperationType {
case insertOp:
lwRsp.Action = sdcommon.ActionCreate
case updateOp:
lwRsp.Action = sdcommon.ActionUpdate
case deleteOp:
lwRsp.Action = sdcommon.ActionDelete
default:
log.Warn(fmt.Sprintf("unrecognized action:%s", lwRsp.Action))
}
f(lwRsp)
}
return err
}
func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw) (resource sdcommon.Resource) {
var err error
documentID := MongoDocument{}
err = bson.Unmarshal(fullDocument, &documentID)
if err != nil {
return
}
resource.DocumentID = documentID.ID.Hex()
switch lw.Key {
case instance:
instance := model.Instance{}
err = bson.Unmarshal(fullDocument, &instance)
if err != nil {
log.Error("error to parse bson raw to documentInfo", err)
return
}
if instance.Instance == nil {
log.Error(fmt.Sprintf("unexpect instance value,the documentID is %s", resource.DocumentID), nil)
return
}
resource.Key = instance.Instance.InstanceId
resource.Value = instance
resource.Index = instance.Instance.ServiceId
case service:
service := model.Service{}
err := bson.Unmarshal(fullDocument, &service)
if err != nil {
log.Error("error to parse bson raw to documentInfo", err)
return
}
if service.Service == nil {
log.Error(fmt.Sprintf("unexpect service value,the documentID is %s", resource.DocumentID), nil)
return
}
resource.Key = service.Service.ServiceId
resource.Value = service
resource.Index = util.StringJoin([]string{service.Domain, service.Project, service.Service.ServiceName, service.Service.Version, service.Service.AppId, service.Service.Environment}, "/")
default:
return
}
return
}
func (lw *mongoListWatch) ResumeToken() bson.Raw {
return lw.resumeToken
}
func (lw *mongoListWatch) setResumeToken(resumeToken bson.Raw) {
lw.resumeToken = resumeToken
}
func (lw *mongoListWatch) doParseWatchRspToResource(wRsp *MongoWatchResponse) (resource sdcommon.Resource) {
switch wRsp.OperationType {
case deleteOp:
//delete operation has no fullDocumentValue
resource.DocumentID = wRsp.DocumentKey.ID.Hex()
return
case insertOp, updateOp, replaceOp:
return lw.doParseDocumentToResource(wRsp.FullDocument)
default:
log.Warn(fmt.Sprintf("unrecognized operation:%s", wRsp.OperationType))
}
return
}