blob: 35ada351d5c7ea495616b2410b71e6e912f46cae [file] [log] [blame]
//Copyright 2017 Huawei Technologies Co., Ltd
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package registry
import (
"encoding/json"
"github.com/astaxie/beego"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"reflect"
"strconv"
"sync"
"time"
"unsafe"
)
var (
RegistryPlugins map[string]func(cfg *Config) Registry
registryInstance Registry
singletonLock sync.Mutex
)
type ActionType int
func (at ActionType) String() string {
switch at {
case GET:
return "GET"
case PUT:
return "PUT"
case DELETE:
return "DELETE"
default:
return "ACTION" + strconv.Itoa(int(at))
}
}
type SortOrder int
func (so SortOrder) String() string {
switch so {
case SORT_NONE:
return "SORT_NONE"
case SORT_ASCEND:
return "SORT_ASCEND"
case SORT_DESCEND:
return "SORT_DESCEND"
default:
return "SORT" + strconv.Itoa(int(so))
}
}
type CompareType int
type CompareResult int
const (
GET ActionType = 0
PUT ActionType = 1
DELETE ActionType = 2
SORT_NONE SortOrder = 0
SORT_ASCEND SortOrder = 1
SORT_DESCEND SortOrder = 2
CMP_VERSION CompareType = 0
CMP_CREATE CompareType = 1
CMP_MOD CompareType = 2
CMP_VALUE CompareType = 3
CMP_EQUAL CompareResult = 0
CMP_GREATER CompareResult = 1
CMP_LESS CompareResult = 2
CMP_NOT_EQUAL CompareResult = 3
REFRESH_MANAGER_CLUSTER_INTERVAL = 30
REQUEST_TIMEOUT = 300
MAX_TXN_NUMBER_ONE_TIME = 128
)
type Registry interface {
Err() <-chan error
Ready() <-chan int
PutNoOverride(ctx context.Context, op *PluginOp) (bool, error)
Do(ctx context.Context, op *PluginOp) (*PluginResponse, error)
Txn(ctx context.Context, ops []*PluginOp) (*PluginResponse, error)
TxnWithCmp(ctx context.Context, success []*PluginOp, cmp []*CompareOp, fail []*PluginOp) (*PluginResponse, error)
LeaseGrant(ctx context.Context, TTL int64) (leaseID int64, err error)
LeaseRenew(ctx context.Context, leaseID int64) (TTL int64, err error)
LeaseRevoke(ctx context.Context, leaseID int64) error
// this function block util:
// 1. connection error
// 2. call send function failed
// 3. response.Err()
// 4. time out to watch, but return nil
Watch(ctx context.Context, op *PluginOp, send func(message string, evt *PluginResponse) error) error
Close()
CompactCluster(ctx context.Context)
Compact(ctx context.Context, revision int64) error
}
type Store interface {
}
type Config struct {
EmbedMode string
ClusterAddresses string
AutoSyncInterval int64
}
type PluginOp struct {
Action ActionType `json:"action"`
Key []byte `json:"key,omitempty"`
EndKey []byte `json:"endKey,omitempty"`
Value []byte `json:"value,omitempty"`
WithPrefix bool `json:"prefix,omitempty"`
WithPrevKV bool `json:"prevKV,omitempty"`
Lease int64 `json:"leaseId,omitempty"`
KeyOnly bool `json:"keyOnly,omitempty"`
CountOnly bool `json:"countOnly,omitempty"`
SortOrder SortOrder `json:"sort,omitempty"`
WithRev int64 `json:"rev,omitempty"`
WithNoCache bool `json:"noCache,omitempty"`
WithIgnoreLease bool `json:"ignoreLease,omitempty"`
}
func (op *PluginOp) String() string {
b, _ := json.Marshal(op)
return BytesToStringWithNoCopy(b)
}
type PluginResponse struct {
Action ActionType
Kvs []*mvccpb.KeyValue
PrevKv *mvccpb.KeyValue // TODO do not support now
Count int64
Revision int64
Succeeded bool
}
type CompareOp struct {
Key []byte
Type CompareType
Result CompareResult
Value interface{}
}
func init() {
RegistryPlugins = make(map[string]func(cfg *Config) Registry)
}
func RegisterCenterClient() (Registry, error) {
registryFunc := RegistryPlugins[beego.AppConfig.String("registry_plugin")]
autoSyncInterval, _ := beego.AppConfig.Int64("auto_sync_interval")
if autoSyncInterval <= 0 {
autoSyncInterval = REFRESH_MANAGER_CLUSTER_INTERVAL
}
instance := registryFunc(&Config{
ClusterAddresses: beego.AppConfig.String("manager_cluster"),
AutoSyncInterval: autoSyncInterval,
})
select {
case err := <-instance.Err():
return nil, err
case <-instance.Ready():
}
return instance, nil
}
func GetRegisterCenter() Registry {
if registryInstance == nil {
singletonLock.Lock()
if registryInstance == nil {
inst, _ := RegisterCenterClient()
registryInstance = inst
}
singletonLock.Unlock()
}
return registryInstance
}
func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second)
}
func WithWatchPrefix(key string) *PluginOp {
return &PluginOp{
Action: GET,
Key: []byte(key),
WithPrefix: true,
WithPrevKV: true,
}
}
func BytesToStringWithNoCopy(bytes []byte) (s string) {
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
}
func BatchCommit(ctx context.Context, opts []*PluginOp) error {
lenOpts := len(opts)
tmpLen := lenOpts
tmpOpts := []*PluginOp{}
var err error
for i := 0; tmpLen > 0; i++ {
tmpLen = lenOpts - (i+1)*MAX_TXN_NUMBER_ONE_TIME
if tmpLen > 0 {
tmpOpts = opts[i*MAX_TXN_NUMBER_ONE_TIME : (i+1)*MAX_TXN_NUMBER_ONE_TIME]
} else {
tmpOpts = opts[i*MAX_TXN_NUMBER_ONE_TIME : lenOpts]
}
_, err = GetRegisterCenter().Txn(ctx, tmpOpts)
if err != nil {
return err
}
}
return nil
}