| //Copyright 2017 Huawei Technologies Co., Ltd |
| // |
| //Licensed under the Apache License, Version 2.0 (the "License"); |
| //you may not use this file except in compliance with the License. |
| //You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| //Unless required by applicable law or agreed to in writing, software |
| //distributed under the License is distributed on an "AS IS" BASIS, |
| //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| //See the License for the specific language governing permissions and |
| //limitations under the License. |
| package etcd |
| |
| import ( |
| "crypto/tls" |
| "errors" |
| "fmt" |
| "github.com/ServiceComb/service-center/pkg/common" |
| "github.com/ServiceComb/service-center/server/core/registry" |
| "github.com/ServiceComb/service-center/util" |
| "github.com/ServiceComb/service-center/util/rest" |
| "github.com/astaxie/beego" |
| "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/mvcc/mvccpb" |
| "golang.org/x/net/context" |
| "strings" |
| "time" |
| ) |
| |
| const ( |
| REGISTRY_PLUGIN_ETCD = "etcd" |
| CONNECT_MANAGER_SERVER_TIMEOUT = 10 |
| DEFAULT_PAGE_COUNT = 5000 // grpc does not allow to transport a large body more then 4MB in a request. |
| ) |
| |
| var clientTLSConfig *tls.Config |
| |
| func init() { |
| util.LOGGER.Infof("etcd plugin init.") |
| registry.RegistryPlugins[REGISTRY_PLUGIN_ETCD] = NewRegistry |
| } |
| |
| type EtcdClient struct { |
| Client *clientv3.Client |
| err chan error |
| ready chan int |
| } |
| |
| func (s *EtcdClient) Err() <-chan error { |
| return s.err |
| } |
| |
| func (s *EtcdClient) Ready() <-chan int { |
| return s.ready |
| } |
| |
| func (s *EtcdClient) Close() { |
| if s.Client != nil { |
| s.Client.Close() |
| } |
| util.LOGGER.Debugf("etcd client stopped.") |
| } |
| |
| func (c *EtcdClient) CompactCluster(ctx context.Context) { |
| for _, ep := range c.Client.Endpoints() { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| mapi := clientv3.NewMaintenance(c.Client) |
| resp, err := mapi.Status(otCtx, ep) |
| if err != nil { |
| util.LOGGER.Error(fmt.Sprintf("Compact error ,can not get status from %s", ep), err) |
| continue |
| } |
| curRev := resp.Header.Revision |
| util.LOGGER.Debug(fmt.Sprintf("Compacting.... endpoint: %s / IsLeader: %v\n / revision is %d", ep, resp.Header.MemberId == resp.Leader, curRev)) |
| c.Compact(ctx, curRev) |
| } |
| |
| } |
| |
| func (c *EtcdClient) Compact(ctx context.Context, revision int64) error { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| revToCompact := max(0, revision-beego.AppConfig.DefaultInt64("compact_index_delta", 100)) |
| if revToCompact <= 0 { |
| util.LOGGER.Warnf(nil, "revToCompact is %d, <=0, no nead to compact.", revToCompact) |
| return nil |
| } |
| util.LOGGER.Debug(fmt.Sprintf("Compacting %d", revToCompact)) |
| resp, err := c.Client.KV.Compact(otCtx, revToCompact) |
| if err != nil { |
| return err |
| } |
| util.LOGGER.Debugf(fmt.Sprintf("Compacted %v", resp)) |
| return nil |
| } |
| |
| func max(n1, n2 int64) int64 { |
| if n1 > n2 { |
| return n1 |
| } |
| return n2 |
| } |
| |
| func (s *EtcdClient) toGetRequest(op *registry.PluginOp) []clientv3.OpOption { |
| opts := []clientv3.OpOption{} |
| if op.WithPrefix { |
| opts = append(opts, clientv3.WithPrefix()) |
| } else if len(op.EndKey) > 0 { |
| opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(registry.BytesToStringWithNoCopy(op.EndKey)))) |
| } |
| if op.WithPrevKV { |
| opts = append(opts, clientv3.WithPrevKV()) |
| } |
| if op.KeyOnly { |
| opts = append(opts, clientv3.WithKeysOnly()) |
| } |
| if op.CountOnly { |
| opts = append(opts, clientv3.WithCountOnly()) |
| } |
| if op.WithRev > 0 { |
| opts = append(opts, clientv3.WithRev(op.WithRev)) |
| } |
| switch op.SortOrder { |
| case registry.SORT_ASCEND: |
| opts = append(opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) |
| case registry.SORT_DESCEND: |
| opts = append(opts, clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) |
| } |
| return opts |
| } |
| |
| func (s *EtcdClient) toPutRequest(op *registry.PluginOp) []clientv3.OpOption { |
| opts := []clientv3.OpOption{} |
| if op.WithPrevKV { |
| opts = append(opts, clientv3.WithPrevKV()) |
| } |
| if op.Lease > 0 { |
| opts = append(opts, clientv3.WithLease(clientv3.LeaseID(op.Lease))) |
| } |
| if op.WithIgnoreLease { |
| // TODO WithIgnoreLease support |
| } |
| return opts |
| } |
| |
| func (s *EtcdClient) toDeleteRequest(op *registry.PluginOp) []clientv3.OpOption { |
| opts := []clientv3.OpOption{} |
| if op.WithPrefix { |
| opts = append(opts, clientv3.WithPrefix()) |
| } |
| if op.WithPrevKV { |
| opts = append(opts, clientv3.WithPrevKV()) |
| } |
| return opts |
| } |
| |
| func (c *EtcdClient) toTxnRequest(opts []*registry.PluginOp) []clientv3.Op { |
| etcdOps := []clientv3.Op{} |
| for _, op := range opts { |
| switch op.Action { |
| case registry.GET: |
| etcdOps = append(etcdOps, clientv3.OpGet(registry.BytesToStringWithNoCopy(op.Key), c.toGetRequest(op)...)) |
| case registry.PUT: |
| var value string = "" |
| if len(op.Value) > 0 { |
| value = registry.BytesToStringWithNoCopy(op.Value) |
| } |
| etcdOps = append(etcdOps, clientv3.OpPut(registry.BytesToStringWithNoCopy(op.Key), value, c.toPutRequest(op)...)) |
| case registry.DELETE: |
| etcdOps = append(etcdOps, clientv3.OpDelete(registry.BytesToStringWithNoCopy(op.Key), c.toDeleteRequest(op)...)) |
| } |
| } |
| return etcdOps |
| } |
| |
| func (c *EtcdClient) toCompares(cmps []*registry.CompareOp) []clientv3.Cmp { |
| etcdCmps := []clientv3.Cmp{} |
| for _, cmp := range cmps { |
| var cmpType clientv3.Cmp |
| var cmpResult string |
| key := registry.BytesToStringWithNoCopy(cmp.Key) |
| switch cmp.Type { |
| case registry.CMP_VERSION: |
| cmpType = clientv3.Version(key) |
| case registry.CMP_CREATE: |
| cmpType = clientv3.CreateRevision(key) |
| case registry.CMP_MOD: |
| cmpType = clientv3.ModRevision(key) |
| case registry.CMP_VALUE: |
| cmpType = clientv3.Value(key) |
| } |
| switch cmp.Result { |
| case registry.CMP_EQUAL: |
| cmpResult = "=" |
| case registry.CMP_GREATER: |
| cmpResult = ">" |
| case registry.CMP_LESS: |
| cmpResult = "<" |
| case registry.CMP_NOT_EQUAL: |
| cmpResult = "!=" |
| } |
| etcdCmps = append(etcdCmps, clientv3.Compare(cmpType, cmpResult, cmp.Value)) |
| } |
| return etcdCmps |
| } |
| |
| func (c *EtcdClient) PutNoOverride(ctx context.Context, op *registry.PluginOp) (bool, error) { |
| resp, err := c.TxnWithCmp(ctx, []*registry.PluginOp{op}, []*registry.CompareOp{ |
| { |
| Key: op.Key, |
| Type: registry.CMP_CREATE, |
| Result: registry.CMP_EQUAL, |
| Value: 0, |
| }, |
| }, nil) |
| if err != nil { |
| util.LOGGER.Errorf(err, "PutNoOverride %s failed", op.Key) |
| return false, err |
| } |
| util.LOGGER.Infof("response %s %v %v", op.Key, resp.Succeeded, resp.Revision) |
| return resp.Succeeded, nil |
| } |
| |
| func (c *EtcdClient) paging(ctx context.Context, op *registry.PluginOp, countPerPage int) (*clientv3.GetResponse, error) { |
| var etcdResp *clientv3.GetResponse |
| key := registry.BytesToStringWithNoCopy(op.Key) |
| |
| tempOp := *op |
| tempOp.CountOnly = true |
| coutResp, err := c.Client.Get(ctx, key, c.toGetRequest(&tempOp)...) |
| if err != nil { |
| return nil, err |
| } |
| |
| recordCount := int(coutResp.Count) |
| if recordCount < countPerPage { |
| return nil, nil // no paging |
| } |
| |
| util.LOGGER.Debugf("get too many KeyValues from etcdserver, now paging.(%d vs %d)", |
| recordCount, DEFAULT_PAGE_COUNT) |
| |
| tempOp.KeyOnly = false |
| tempOp.CountOnly = false |
| tempOp.WithPrefix = false |
| tempOp.SortOrder = registry.SORT_ASCEND |
| tempOp.EndKey = op.Key |
| tempOp.WithRev = coutResp.Header.Revision |
| |
| etcdResp = coutResp |
| etcdResp.Kvs = make([]*mvccpb.KeyValue, 0, etcdResp.Count) |
| |
| pageCount := recordCount / countPerPage |
| remainCount := recordCount % countPerPage |
| if remainCount > 0 { |
| pageCount++ |
| } |
| |
| baseOps := []clientv3.OpOption{} |
| baseOps = append(baseOps, c.toGetRequest(&tempOp)...) |
| |
| for i := 0; i < pageCount; i++ { |
| limit := countPerPage |
| if i == pageCount-1 { |
| limit = remainCount |
| } |
| ops := append(baseOps, clientv3.WithLimit(int64(limit))) |
| recordResp, err := c.Client.Get(ctx, key, ops...) |
| if err != nil { |
| return nil, err |
| } |
| l := int64(len(recordResp.Kvs)) |
| nextKey := recordResp.Kvs[l-1].Key |
| key = clientv3.GetPrefixRangeEnd(registry.BytesToStringWithNoCopy(nextKey)) |
| etcdResp.Kvs = append(etcdResp.Kvs, recordResp.Kvs...) |
| } |
| |
| // too slow |
| if op.SortOrder == registry.SORT_DESCEND { |
| t := time.Now() |
| var last int |
| for i := 0; i < recordCount; i++ { |
| last = recordCount - i - 1 |
| if last <= i { |
| break |
| } |
| etcdResp.Kvs[i], etcdResp.Kvs[last] = etcdResp.Kvs[last], etcdResp.Kvs[i] |
| } |
| util.LOGGER.Debugf("sorted %d KeyValues spend %s", recordCount, time.Now().Sub(t)) |
| } |
| return etcdResp, nil |
| } |
| |
| func (c *EtcdClient) Do(ctx context.Context, op *registry.PluginOp) (*registry.PluginResponse, error) { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| var err error |
| var resp *registry.PluginResponse |
| switch op.Action { |
| case registry.GET: |
| var etcdResp *clientv3.GetResponse |
| key := registry.BytesToStringWithNoCopy(op.Key) |
| |
| if op.WithPrefix && !op.CountOnly && !op.KeyOnly { |
| etcdResp, err = c.paging(ctx, op, DEFAULT_PAGE_COUNT) |
| if err != nil { |
| break |
| } |
| } |
| |
| if etcdResp == nil { |
| etcdResp, err = c.Client.Get(otCtx, key, c.toGetRequest(op)...) |
| if err != nil { |
| break |
| } |
| } |
| |
| resp = ®istry.PluginResponse{ |
| Kvs: etcdResp.Kvs, |
| Count: etcdResp.Count, |
| Revision: etcdResp.Header.Revision, |
| } |
| case registry.PUT: |
| var value string = "" |
| if len(op.Value) > 0 { |
| value = registry.BytesToStringWithNoCopy(op.Value) |
| } |
| var etcdResp *clientv3.PutResponse |
| etcdResp, err = c.Client.Put(otCtx, registry.BytesToStringWithNoCopy(op.Key), value, c.toPutRequest(op)...) |
| if err != nil { |
| break |
| } |
| resp = ®istry.PluginResponse{ |
| PrevKv: etcdResp.PrevKv, |
| Revision: etcdResp.Header.Revision, |
| } |
| case registry.DELETE: |
| var etcdResp *clientv3.DeleteResponse |
| etcdResp, err = c.Client.Delete(otCtx, registry.BytesToStringWithNoCopy(op.Key), c.toDeleteRequest(op)...) |
| if err != nil { |
| break |
| } |
| resp = ®istry.PluginResponse{ |
| Revision: etcdResp.Header.Revision, |
| } |
| } |
| if err != nil { |
| return nil, err |
| } |
| resp.Succeeded = true |
| return resp, nil |
| } |
| |
| func (c *EtcdClient) Txn(ctx context.Context, opts []*registry.PluginOp) (*registry.PluginResponse, error) { |
| resp, err := c.TxnWithCmp(ctx, opts, nil, nil) |
| if err != nil { |
| return nil, err |
| } |
| return ®istry.PluginResponse{ |
| Succeeded: resp.Succeeded, |
| Revision: resp.Revision, |
| }, nil |
| } |
| |
| func (c *EtcdClient) TxnWithCmp(ctx context.Context, success []*registry.PluginOp, cmps []*registry.CompareOp, fail []*registry.PluginOp) (*registry.PluginResponse, error) { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| |
| etcdCmps := c.toCompares(cmps) |
| etcdSuccessOps := c.toTxnRequest(success) |
| etcdFailOps := c.toTxnRequest(fail) |
| |
| kvc := clientv3.NewKV(c.Client) |
| txn := kvc.Txn(otCtx) |
| if len(etcdCmps) > 0 { |
| txn.If(etcdCmps...) |
| } |
| txn.Then(etcdSuccessOps...) |
| if len(etcdFailOps) > 0 { |
| txn.Else(etcdFailOps...) |
| } |
| resp, err := txn.Commit() |
| if err != nil { |
| return nil, err |
| } |
| return ®istry.PluginResponse{ |
| Succeeded: resp.Succeeded, |
| Revision: resp.Header.Revision, |
| }, nil |
| } |
| |
| func (c *EtcdClient) LeaseGrant(ctx context.Context, TTL int64) (int64, error) { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| etcdResp, err := c.Client.Grant(otCtx, TTL) |
| if err != nil { |
| return 0, err |
| } |
| return int64(etcdResp.ID), nil |
| } |
| |
| func (c *EtcdClient) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| etcdResp, err := c.Client.KeepAliveOnce(otCtx, clientv3.LeaseID(leaseID)) |
| if err != nil { |
| return 0, err |
| } |
| return etcdResp.TTL, nil |
| } |
| |
| func (c *EtcdClient) LeaseRevoke(ctx context.Context, leaseID int64) error { |
| otCtx, cancel := registry.WithTimeout(ctx) |
| defer cancel() |
| _, err := c.Client.Revoke(otCtx, clientv3.LeaseID(leaseID)) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (c *EtcdClient) Watch(ctx context.Context, op *registry.PluginOp, send func(message string, evt *registry.PluginResponse) error) (err error) { |
| n := len(op.Key) |
| if n > 0 { |
| // 必须创建新的client连接 |
| /*client, err := newClient(c.Client.Endpoints()) |
| if err != nil { |
| util.LOGGER.Error("get manager client failed", err) |
| return err |
| } |
| defer client.Close()*/ |
| // 现在跟ETCD仅使用一个连接,共用client即可 |
| client := clientv3.NewWatcher(c.Client) |
| defer client.Close() |
| |
| key := registry.BytesToStringWithNoCopy(op.Key) |
| if op.WithPrefix && key[len(key)-1] != '/' { |
| key += "/" |
| } |
| util.LOGGER.Debugf("start to watch key %s", key) |
| |
| // 不能设置超时context,内部判断了连接超时和watch超时 |
| ws := client.Watch(context.Background(), key, c.toGetRequest(op)...) |
| |
| var ok bool |
| var resp clientv3.WatchResponse |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case resp, ok = <-ws: |
| if !ok { |
| err := errors.New("channel is closed") |
| return err |
| } |
| if err = resp.Err(); err != nil { |
| return err |
| } |
| for _, evt := range resp.Events { |
| pbEvent := ®istry.PluginResponse{ |
| Action: registry.PUT, |
| Kvs: []*mvccpb.KeyValue{evt.Kv}, |
| PrevKv: evt.PrevKv, |
| Count: 1, |
| Revision: evt.Kv.ModRevision, |
| Succeeded: true, |
| } |
| if evt.Type == mvccpb.DELETE { |
| pbEvent.Action = registry.DELETE |
| } |
| |
| err = send("key information changed", pbEvent) |
| if err != nil { |
| return |
| } |
| } |
| } |
| } |
| } |
| err = fmt.Errorf("no key has been watched") |
| return |
| } |
| |
| func NewRegistry(cfg *registry.Config) registry.Registry { |
| util.LOGGER.Warnf(nil, "starting service center in proxy mode") |
| |
| inst := &EtcdClient{ |
| err: make(chan error, 1), |
| ready: make(chan int), |
| } |
| addrs := strings.Split(cfg.ClusterAddresses, ",") |
| |
| if common.GetClientSSLConfig().SSLEnabled && strings.Index(cfg.ClusterAddresses, "https://") >= 0 { |
| var err error |
| // go client tls限制,提供身份证书、不认证服务端、不校验CN |
| clientTLSConfig, err = rest.GetClientTLSConfig(common.GetClientSSLConfig().VerifyClient, true, false) |
| if err != nil { |
| util.LOGGER.Error("get etcd client tls config failed", err) |
| inst.err <- err |
| return inst |
| } |
| } |
| |
| endpoints := []string{} |
| for _, addr := range addrs { |
| if strings.Index(addr, "://") > 0 { |
| // 如果配置格式为"sr-0=http(s)://IP:Port",则需要分离IP:Port部分 |
| endpoints = append(endpoints, addr[strings.Index(addr, "://")+3:]) |
| } else { |
| endpoints = append(endpoints, addr) |
| } |
| |
| } |
| refreshManagerClusterInterval := cfg.AutoSyncInterval |
| util.LOGGER.Debugf("refreshManagerClusterInterval is %d", refreshManagerClusterInterval) |
| client, err := newClient(endpoints, refreshManagerClusterInterval) |
| if err != nil { |
| util.LOGGER.Errorf(err, "get etcd client %+v failed.", endpoints) |
| inst.err <- err |
| return inst |
| } |
| |
| util.LOGGER.Warnf(nil, "get etcd client %+v completed.", endpoints) |
| inst.Client = client |
| close(inst.ready) |
| return inst |
| } |
| |
| func newClient(endpoints []string, autoSyncInterval int64) (*clientv3.Client, error) { |
| client, err := clientv3.New(clientv3.Config{ |
| Endpoints: endpoints, |
| DialTimeout: CONNECT_MANAGER_SERVER_TIMEOUT * time.Second, |
| TLS: clientTLSConfig, // 暂时与API Server共用一套证书 |
| AutoSyncInterval: time.Duration(autoSyncInterval) * time.Second, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return client, nil |
| } |